From f4c8c6cf89e401d75d0ffb72aaf351dc947d52ee Mon Sep 17 00:00:00 2001 From: kixelated Date: Tue, 23 May 2023 12:04:27 -0700 Subject: [PATCH] Tabs not spaces u nerds. (#18) --- .editorconfig | 10 + .github/workflows/server.yml | 6 +- .github/workflows/web.yml | 3 - server/.rustfmt.toml | 4 + server/.vscode/settings.json | 3 - server/src/main.rs | 42 +-- server/src/media/source.rs | 314 ++++++++-------- server/src/session/message.rs | 38 +- server/src/session/mod.rs | 218 ++++++----- server/src/transport/app.rs | 8 +- server/src/transport/connection.rs | 6 +- server/src/transport/server.rs | 548 ++++++++++++++-------------- server/src/transport/streams.rs | 207 +++++------ web/.eslintrc.cjs | 43 +-- web/.prettierrc.json | 3 - web/.prettierrc.yaml | 4 + web/.proxyrc.js | 10 +- web/package.json | 51 +-- web/src/broadcaster/encoder.ts | 158 ++++---- web/src/broadcaster/index.ts | 6 +- web/src/index.css | 72 ++-- web/src/index.html | 48 +-- web/src/index.ts | 26 +- web/src/mp4/index.ts | 22 +- web/src/mp4/init.ts | 58 +-- web/src/mp4/mp4box.d.ts | 390 ++++++++++---------- web/src/player/audio.ts | 121 +++--- web/src/player/decoder.ts | 260 +++++++------ web/src/player/index.ts | 121 +++--- web/src/player/message.ts | 16 +- web/src/player/renderer.ts | 42 +-- web/src/player/ring.ts | 232 ++++++------ web/src/player/video.ts | 149 ++++---- web/src/player/worker.ts | 28 +- web/src/player/worklet.ts | 68 ++-- web/src/stream/reader.ts | 323 ++++++++-------- web/src/stream/writer.ts | 160 ++++---- web/src/transport/index.ts | 142 +++---- web/src/transport/interface.ts | 12 +- web/src/transport/message.ts | 2 +- web/src/transport/webtransport.d.ts | 82 ++--- web/src/util/deferred.ts | 32 +- web/tsconfig.json | 14 +- web/yarn.lock | 19 + 44 files changed, 2027 insertions(+), 2094 deletions(-) create mode 100644 .editorconfig create mode 100644 server/.rustfmt.toml delete mode 100644 server/.vscode/settings.json delete mode 100644 web/.prettierrc.json create mode 100644 web/.prettierrc.yaml diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..6773ee4 --- /dev/null +++ b/.editorconfig @@ -0,0 +1,10 @@ +root = true + +[*] +charset = utf-8 +end_of_line = lf +trim_trailing_whitespace = true +insert_final_newline = true +indent_style = tab +indent_size = 4 +max_line_length = 120 diff --git a/.github/workflows/server.yml b/.github/workflows/server.yml index f0a69fa..fe72953 100644 --- a/.github/workflows/server.yml +++ b/.github/workflows/server.yml @@ -26,8 +26,8 @@ jobs: - name: test run: cargo test --verbose - - name: fmt - run: cargo fmt --check - - name: clippy run: cargo clippy + + - name: fmt + run: cargo fmt --check diff --git a/.github/workflows/web.yml b/.github/workflows/web.yml index b9053ad..40fd3b5 100644 --- a/.github/workflows/web.yml +++ b/.github/workflows/web.yml @@ -25,8 +25,5 @@ jobs: - name: build run: yarn build - - name: fmt - run: yarn prettier --check . - - name: lint run: yarn lint diff --git a/server/.rustfmt.toml b/server/.rustfmt.toml new file mode 100644 index 0000000..276903b --- /dev/null +++ b/server/.rustfmt.toml @@ -0,0 +1,4 @@ +# i die on this hill +hard_tabs = true + +max_width = 120 \ No newline at end of file diff --git a/server/.vscode/settings.json b/server/.vscode/settings.json deleted file mode 100644 index 4d9636b..0000000 --- a/server/.vscode/settings.json +++ /dev/null @@ -1,3 +0,0 @@ -{ - "rust-analyzer.showUnlinkedFileNotification": false -} \ No newline at end of file diff --git a/server/src/main.rs b/server/src/main.rs index 373474c..4c659c1 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -5,34 +5,34 @@ use clap::Parser; /// Search for a pattern in a file and display the lines that contain it. #[derive(Parser)] struct Cli { - /// Listen on this address - #[arg(short, long, default_value = "[::]:4443")] - addr: String, + /// Listen on this address + #[arg(short, long, default_value = "[::]:4443")] + addr: String, - /// Use the certificate file at this path - #[arg(short, long, default_value = "../cert/localhost.crt")] - cert: String, + /// Use the certificate file at this path + #[arg(short, long, default_value = "../cert/localhost.crt")] + cert: String, - /// Use the private key at this path - #[arg(short, long, default_value = "../cert/localhost.key")] - key: String, + /// Use the private key at this path + #[arg(short, long, default_value = "../cert/localhost.key")] + key: String, - /// Use the media file at this path - #[arg(short, long, default_value = "../media/fragmented.mp4")] - media: String, + /// Use the media file at this path + #[arg(short, long, default_value = "../media/fragmented.mp4")] + media: String, } fn main() -> anyhow::Result<()> { - env_logger::init(); + env_logger::init(); - let args = Cli::parse(); + let args = Cli::parse(); - let server_config = transport::Config { - addr: args.addr, - cert: args.cert, - key: args.key, - }; + let server_config = transport::Config { + addr: args.addr, + cert: args.cert, + key: args.key, + }; - let mut server = transport::Server::::new(server_config).unwrap(); - server.run() + let mut server = transport::Server::::new(server_config).unwrap(); + server.run() } diff --git a/server/src/media/source.rs b/server/src/media/source.rs index ebbb5e4..660647e 100644 --- a/server/src/media/source.rs +++ b/server/src/media/source.rs @@ -8,223 +8,219 @@ use mp4; use mp4::ReadBox; pub struct Source { - // We read the file once, in order, and don't seek backwards. - reader: io::BufReader, + // We read the file once, in order, and don't seek backwards. + reader: io::BufReader, - // The timestamp when the broadcast "started", so we can sleep to simulate a live stream. - start: time::Instant, + // The timestamp when the broadcast "started", so we can sleep to simulate a live stream. + start: time::Instant, - // The initialization payload; ftyp + moov boxes. - pub init: Vec, + // The initialization payload; ftyp + moov boxes. + pub init: Vec, - // The timescale used for each track. - timescales: HashMap, + // The timescale used for each track. + timescales: HashMap, - // Any fragments parsed and ready to be returned by next(). - fragments: VecDeque, + // Any fragments parsed and ready to be returned by next(). + fragments: VecDeque, } pub struct Fragment { - // The track ID for the fragment. - pub track_id: u32, + // The track ID for the fragment. + pub track_id: u32, - // The data of the fragment. - pub data: Vec, + // The data of the fragment. + pub data: Vec, - // Whether this fragment is a keyframe. - pub keyframe: bool, + // Whether this fragment is a keyframe. + pub keyframe: bool, - // The timestamp of the fragment, in milliseconds, to simulate a live stream. - pub timestamp: u64, + // The timestamp of the fragment, in milliseconds, to simulate a live stream. + pub timestamp: u64, } impl Source { - pub fn new(path: &str) -> anyhow::Result { - let f = fs::File::open(path)?; - let mut reader = io::BufReader::new(f); - let start = time::Instant::now(); + pub fn new(path: &str) -> anyhow::Result { + let f = fs::File::open(path)?; + let mut reader = io::BufReader::new(f); + let start = time::Instant::now(); - let ftyp = read_atom(&mut reader)?; - anyhow::ensure!(&ftyp[4..8] == b"ftyp", "expected ftyp atom"); + let ftyp = read_atom(&mut reader)?; + anyhow::ensure!(&ftyp[4..8] == b"ftyp", "expected ftyp atom"); - let moov = read_atom(&mut reader)?; - anyhow::ensure!(&moov[4..8] == b"moov", "expected moov atom"); + let moov = read_atom(&mut reader)?; + anyhow::ensure!(&moov[4..8] == b"moov", "expected moov atom"); - let mut init = ftyp; - init.extend(&moov); + let mut init = ftyp; + init.extend(&moov); - // We're going to parse the moov box. - // We have to read the moov box header to correctly advance the cursor for the mp4 crate. - let mut moov_reader = io::Cursor::new(&moov); - let moov_header = mp4::BoxHeader::read(&mut moov_reader)?; + // We're going to parse the moov box. + // We have to read the moov box header to correctly advance the cursor for the mp4 crate. + let mut moov_reader = io::Cursor::new(&moov); + let moov_header = mp4::BoxHeader::read(&mut moov_reader)?; - // Parse the moov box so we can detect the timescales for each track. - let moov = mp4::MoovBox::read_box(&mut moov_reader, moov_header.size)?; + // Parse the moov box so we can detect the timescales for each track. + let moov = mp4::MoovBox::read_box(&mut moov_reader, moov_header.size)?; - Ok(Self { - reader, - start, - init, - timescales: timescales(&moov), - fragments: VecDeque::new(), - }) - } + Ok(Self { + reader, + start, + init, + timescales: timescales(&moov), + fragments: VecDeque::new(), + }) + } - pub fn fragment(&mut self) -> anyhow::Result> { - if self.fragments.is_empty() { - self.parse()?; - }; + pub fn fragment(&mut self) -> anyhow::Result> { + if self.fragments.is_empty() { + self.parse()?; + }; - if self.timeout().is_some() { - return Ok(None); - } + if self.timeout().is_some() { + return Ok(None); + } - Ok(self.fragments.pop_front()) - } + Ok(self.fragments.pop_front()) + } - fn parse(&mut self) -> anyhow::Result<()> { - loop { - let atom = read_atom(&mut self.reader)?; + fn parse(&mut self) -> anyhow::Result<()> { + loop { + let atom = read_atom(&mut self.reader)?; - let mut reader = io::Cursor::new(&atom); - let header = mp4::BoxHeader::read(&mut reader)?; + let mut reader = io::Cursor::new(&atom); + let header = mp4::BoxHeader::read(&mut reader)?; - match header.name { - mp4::BoxType::FtypBox | mp4::BoxType::MoovBox => { - anyhow::bail!("must call init first") - } - mp4::BoxType::MoofBox => { - let moof = mp4::MoofBox::read_box(&mut reader, header.size)?; + match header.name { + mp4::BoxType::FtypBox | mp4::BoxType::MoovBox => { + anyhow::bail!("must call init first") + } + mp4::BoxType::MoofBox => { + let moof = mp4::MoofBox::read_box(&mut reader, header.size)?; - if moof.trafs.len() != 1 { - // We can't split the mdat atom, so this is impossible to support - anyhow::bail!("multiple tracks per moof atom") - } + if moof.trafs.len() != 1 { + // We can't split the mdat atom, so this is impossible to support + anyhow::bail!("multiple tracks per moof atom") + } - self.fragments.push_back(Fragment { - track_id: moof.trafs[0].tfhd.track_id, - data: atom, - keyframe: has_keyframe(&moof), - timestamp: first_timestamp(&moof).expect("couldn't find timestamp"), - }) - } - mp4::BoxType::MdatBox => { - let moof = self.fragments.back().expect("no atom before mdat"); + self.fragments.push_back(Fragment { + track_id: moof.trafs[0].tfhd.track_id, + data: atom, + keyframe: has_keyframe(&moof), + timestamp: first_timestamp(&moof).expect("couldn't find timestamp"), + }) + } + mp4::BoxType::MdatBox => { + let moof = self.fragments.back().expect("no atom before mdat"); - self.fragments.push_back(Fragment { - track_id: moof.track_id, - data: atom, - keyframe: false, - timestamp: moof.timestamp, - }); + self.fragments.push_back(Fragment { + track_id: moof.track_id, + data: atom, + keyframe: false, + timestamp: moof.timestamp, + }); - // We have some media data, return so we can start sending it. - return Ok(()); - } - _ => { - // Skip unknown atoms - } - } - } - } + // We have some media data, return so we can start sending it. + return Ok(()); + } + _ => { + // Skip unknown atoms + } + } + } + } - // Simulate a live stream by sleeping until the next timestamp in the media. - pub fn timeout(&self) -> Option { - let next = self.fragments.front()?; - let timestamp = next.timestamp; + // Simulate a live stream by sleeping until the next timestamp in the media. + pub fn timeout(&self) -> Option { + let next = self.fragments.front()?; + let timestamp = next.timestamp; - // Find the timescale for the track. - let timescale = self.timescales.get(&next.track_id).unwrap(); + // Find the timescale for the track. + let timescale = self.timescales.get(&next.track_id).unwrap(); - let delay = time::Duration::from_millis(1000 * timestamp / *timescale as u64); - let elapsed = self.start.elapsed(); + let delay = time::Duration::from_millis(1000 * timestamp / *timescale as u64); + let elapsed = self.start.elapsed(); - delay.checked_sub(elapsed) - } + delay.checked_sub(elapsed) + } } // Read a full MP4 atom into a vector. pub fn read_atom(reader: &mut R) -> anyhow::Result> { - // Read the 8 bytes for the size + type - let mut buf = [0u8; 8]; - reader.read_exact(&mut buf)?; + // Read the 8 bytes for the size + type + let mut buf = [0u8; 8]; + reader.read_exact(&mut buf)?; - // Convert the first 4 bytes into the size. - let size = u32::from_be_bytes(buf[0..4].try_into()?) as u64; - //let typ = &buf[4..8].try_into().ok().unwrap(); + // Convert the first 4 bytes into the size. + let size = u32::from_be_bytes(buf[0..4].try_into()?) as u64; + //let typ = &buf[4..8].try_into().ok().unwrap(); - let mut raw = buf.to_vec(); + let mut raw = buf.to_vec(); - let mut limit = match size { - // Runs until the end of the file. - 0 => reader.take(u64::MAX), + let mut limit = match size { + // Runs until the end of the file. + 0 => reader.take(u64::MAX), - // The next 8 bytes are the extended size to be used instead. - 1 => { - reader.read_exact(&mut buf)?; - let size_large = u64::from_be_bytes(buf); - anyhow::ensure!( - size_large >= 16, - "impossible extended box size: {}", - size_large - ); + // The next 8 bytes are the extended size to be used instead. + 1 => { + reader.read_exact(&mut buf)?; + let size_large = u64::from_be_bytes(buf); + anyhow::ensure!(size_large >= 16, "impossible extended box size: {}", size_large); - reader.take(size_large - 16) - } + reader.take(size_large - 16) + } - 2..=7 => { - anyhow::bail!("impossible box size: {}", size) - } + 2..=7 => { + anyhow::bail!("impossible box size: {}", size) + } - // Otherwise read based on the size. - size => reader.take(size - 8), - }; + // Otherwise read based on the size. + size => reader.take(size - 8), + }; - // Append to the vector and return it. - limit.read_to_end(&mut raw)?; + // Append to the vector and return it. + limit.read_to_end(&mut raw)?; - Ok(raw) + Ok(raw) } fn has_keyframe(moof: &mp4::MoofBox) -> bool { - for traf in &moof.trafs { - // TODO trak default flags if this is None - let default_flags = traf.tfhd.default_sample_flags.unwrap_or_default(); - let trun = match &traf.trun { - Some(t) => t, - None => return false, - }; + for traf in &moof.trafs { + // TODO trak default flags if this is None + let default_flags = traf.tfhd.default_sample_flags.unwrap_or_default(); + let trun = match &traf.trun { + Some(t) => t, + None => return false, + }; - for i in 0..trun.sample_count { - let mut flags = match trun.sample_flags.get(i as usize) { - Some(f) => *f, - None => default_flags, - }; + for i in 0..trun.sample_count { + let mut flags = match trun.sample_flags.get(i as usize) { + Some(f) => *f, + None => default_flags, + }; - if i == 0 && trun.first_sample_flags.is_some() { - flags = trun.first_sample_flags.unwrap(); - } + if i == 0 && trun.first_sample_flags.is_some() { + flags = trun.first_sample_flags.unwrap(); + } - // https://chromium.googlesource.com/chromium/src/media/+/master/formats/mp4/track_run_iterator.cc#177 - let keyframe = (flags >> 24) & 0x3 == 0x2; // kSampleDependsOnNoOther - let non_sync = (flags >> 16) & 0x1 == 0x1; // kSampleIsNonSyncSample + // https://chromium.googlesource.com/chromium/src/media/+/master/formats/mp4/track_run_iterator.cc#177 + let keyframe = (flags >> 24) & 0x3 == 0x2; // kSampleDependsOnNoOther + let non_sync = (flags >> 16) & 0x1 == 0x1; // kSampleIsNonSyncSample - if keyframe && !non_sync { - return true; - } - } - } + if keyframe && !non_sync { + return true; + } + } + } - false + false } fn first_timestamp(moof: &mp4::MoofBox) -> Option { - Some(moof.trafs.first()?.tfdt.as_ref()?.base_media_decode_time) + Some(moof.trafs.first()?.tfdt.as_ref()?.base_media_decode_time) } fn timescales(moov: &mp4::MoovBox) -> HashMap { - moov.traks - .iter() - .map(|trak| (trak.tkhd.track_id, trak.mdia.mdhd.timescale)) - .collect() + moov.traks + .iter() + .map(|trak| (trak.tkhd.track_id, trak.mdia.mdhd.timescale)) + .collect() } diff --git a/server/src/session/message.rs b/server/src/session/message.rs index 0849e2e..63faf73 100644 --- a/server/src/session/message.rs +++ b/server/src/session/message.rs @@ -2,8 +2,8 @@ use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize)] pub struct Message { - pub init: Option, - pub segment: Option, + pub init: Option, + pub segment: Option, } #[derive(Serialize, Deserialize)] @@ -11,27 +11,27 @@ pub struct Init {} #[derive(Serialize, Deserialize)] pub struct Segment { - pub track_id: u32, + pub track_id: u32, } impl Message { - pub fn new() -> Self { - Message { - init: None, - segment: None, - } - } + pub fn new() -> Self { + Message { + init: None, + segment: None, + } + } - pub fn serialize(&self) -> anyhow::Result> { - let str = serde_json::to_string(self)?; - let bytes = str.as_bytes(); - let size = bytes.len() + 8; + pub fn serialize(&self) -> anyhow::Result> { + let str = serde_json::to_string(self)?; + let bytes = str.as_bytes(); + let size = bytes.len() + 8; - let mut out = Vec::with_capacity(size); - out.extend_from_slice(&(size as u32).to_be_bytes()); - out.extend_from_slice(b"warp"); - out.extend_from_slice(bytes); + let mut out = Vec::with_capacity(size); + out.extend_from_slice(&(size as u32).to_be_bytes()); + out.extend_from_slice(b"warp"); + out.extend_from_slice(bytes); - Ok(out) - } + Ok(out) + } } diff --git a/server/src/session/mod.rs b/server/src/session/mod.rs index e5c7cb0..2a28200 100644 --- a/server/src/session/mod.rs +++ b/server/src/session/mod.rs @@ -10,145 +10,141 @@ use crate::{media, transport}; #[derive(Default)] pub struct Session { - media: Option, - streams: transport::Streams, // An easy way of buffering stream data. - tracks: hmap::HashMap, // map from track_id to current stream_id + media: Option, + streams: transport::Streams, // An easy way of buffering stream data. + tracks: hmap::HashMap, // map from track_id to current stream_id } impl transport::App for Session { - // Process any updates to a session. - fn poll( - &mut self, - conn: &mut quiche::Connection, - session: &mut webtransport::ServerSession, - ) -> anyhow::Result<()> { - loop { - let event = match session.poll(conn) { - Err(webtransport::Error::Done) => break, - Err(e) => return Err(e.into()), - Ok(e) => e, - }; + // Process any updates to a session. + fn poll(&mut self, conn: &mut quiche::Connection, session: &mut webtransport::ServerSession) -> anyhow::Result<()> { + loop { + let event = match session.poll(conn) { + Err(webtransport::Error::Done) => break, + Err(e) => return Err(e.into()), + Ok(e) => e, + }; - log::debug!("webtransport event {:?}", event); + log::debug!("webtransport event {:?}", event); - match event { - webtransport::ServerEvent::ConnectRequest(_req) => { - // you can handle request with - // req.authority() - // req.path() - // and you can validate this request with req.origin() - session.accept_connect_request(conn, None)?; + match event { + webtransport::ServerEvent::ConnectRequest(_req) => { + // you can handle request with + // req.authority() + // req.path() + // and you can validate this request with req.origin() + session.accept_connect_request(conn, None)?; - // TODO - let media = media::Source::new("../media/fragmented.mp4")?; - let init = &media.init; + // TODO + let media = media::Source::new("../media/fragmented.mp4")?; + let init = &media.init; - // Create a JSON header. - let mut message = message::Message::new(); - message.init = Some(message::Init {}); - let data = message.serialize()?; + // Create a JSON header. + let mut message = message::Message::new(); + message.init = Some(message::Init {}); + let data = message.serialize()?; - // Create a new stream and write the header. - let stream_id = session.open_stream(conn, false)?; - self.streams.send(conn, stream_id, data.as_slice(), false)?; - self.streams.send(conn, stream_id, init.as_slice(), true)?; + // Create a new stream and write the header. + let stream_id = session.open_stream(conn, false)?; + self.streams.send(conn, stream_id, data.as_slice(), false)?; + self.streams.send(conn, stream_id, init.as_slice(), true)?; - self.media = Some(media); - } - webtransport::ServerEvent::StreamData(stream_id) => { - let mut buf = vec![0; 10000]; - while let Ok(len) = session.recv_stream_data(conn, stream_id, &mut buf) { - let _stream_data = &buf[0..len]; - } - } + self.media = Some(media); + } + webtransport::ServerEvent::StreamData(stream_id) => { + let mut buf = vec![0; 10000]; + while let Ok(len) = session.recv_stream_data(conn, stream_id, &mut buf) { + let _stream_data = &buf[0..len]; + } + } - _ => {} - } - } + _ => {} + } + } - // Send any pending stream data. - // NOTE: This doesn't return an error because it's async, and would be confusing. - self.streams.poll(conn); + // Send any pending stream data. + // NOTE: This doesn't return an error because it's async, and would be confusing. + self.streams.poll(conn); - // Fetch the next media fragment, possibly queuing up stream data. - self.poll_source(conn, session)?; + // Fetch the next media fragment, possibly queuing up stream data. + self.poll_source(conn, session)?; - Ok(()) - } + Ok(()) + } - fn timeout(&self) -> Option { - self.media.as_ref().and_then(|m| m.timeout()) - } + fn timeout(&self) -> Option { + self.media.as_ref().and_then(|m| m.timeout()) + } } impl Session { - fn poll_source( - &mut self, - conn: &mut quiche::Connection, - session: &mut webtransport::ServerSession, - ) -> anyhow::Result<()> { - // Get the media source once the connection is established. - let media = match &mut self.media { - Some(m) => m, - None => return Ok(()), - }; + fn poll_source( + &mut self, + conn: &mut quiche::Connection, + session: &mut webtransport::ServerSession, + ) -> anyhow::Result<()> { + // Get the media source once the connection is established. + let media = match &mut self.media { + Some(m) => m, + None => return Ok(()), + }; - // Get the next media fragment. - let fragment = match media.fragment()? { - Some(f) => f, - None => return Ok(()), - }; + // Get the next media fragment. + let fragment = match media.fragment()? { + Some(f) => f, + None => return Ok(()), + }; - let stream_id = match self.tracks.get(&fragment.track_id) { - // Close the old stream. - Some(stream_id) if fragment.keyframe => { - self.streams.send(conn, *stream_id, &[], true)?; - None - } + let stream_id = match self.tracks.get(&fragment.track_id) { + // Close the old stream. + Some(stream_id) if fragment.keyframe => { + self.streams.send(conn, *stream_id, &[], true)?; + None + } - // Use the existing stream - Some(stream_id) => Some(*stream_id), + // Use the existing stream + Some(stream_id) => Some(*stream_id), - // No existing stream. - _ => None, - }; + // No existing stream. + _ => None, + }; - let stream_id = match stream_id { - // Use the existing stream, - Some(stream_id) => stream_id, + let stream_id = match stream_id { + // Use the existing stream, + Some(stream_id) => stream_id, - // Open a new stream. - None => { - // Create a new unidirectional stream. - let stream_id = session.open_stream(conn, false)?; + // Open a new stream. + None => { + // Create a new unidirectional stream. + let stream_id = session.open_stream(conn, false)?; - // Set the stream priority to be equal to the timestamp. - // We subtract from u64::MAX so newer media is sent important. - // TODO prioritize audio - let order = u64::MAX - fragment.timestamp; - self.streams.send_order(conn, stream_id, order); + // Set the stream priority to be equal to the timestamp. + // We subtract from u64::MAX so newer media is sent important. + // TODO prioritize audio + let order = u64::MAX - fragment.timestamp; + self.streams.send_order(conn, stream_id, order); - // Encode a JSON header indicating this is a new track. - let mut message: message::Message = message::Message::new(); - message.segment = Some(message::Segment { - track_id: fragment.track_id, - }); + // Encode a JSON header indicating this is a new track. + let mut message: message::Message = message::Message::new(); + message.segment = Some(message::Segment { + track_id: fragment.track_id, + }); - // Write the header. - let data = message.serialize()?; - self.streams.send(conn, stream_id, &data, false)?; + // Write the header. + let data = message.serialize()?; + self.streams.send(conn, stream_id, &data, false)?; - // Keep a mapping from the track id to the current stream id. - self.tracks.insert(fragment.track_id, stream_id); + // Keep a mapping from the track id to the current stream id. + self.tracks.insert(fragment.track_id, stream_id); - stream_id - } - }; + stream_id + } + }; - // Write the current fragment. - let data = fragment.data.as_slice(); - self.streams.send(conn, stream_id, data, false)?; + // Write the current fragment. + let data = fragment.data.as_slice(); + self.streams.send(conn, stream_id, data, false)?; - Ok(()) - } + Ok(()) + } } diff --git a/server/src/transport/app.rs b/server/src/transport/app.rs index 9024448..2c5ae3f 100644 --- a/server/src/transport/app.rs +++ b/server/src/transport/app.rs @@ -3,10 +3,6 @@ use std::time; use quiche::h3::webtransport; pub trait App: Default { - fn poll( - &mut self, - conn: &mut quiche::Connection, - session: &mut webtransport::ServerSession, - ) -> anyhow::Result<()>; - fn timeout(&self) -> Option; + fn poll(&mut self, conn: &mut quiche::Connection, session: &mut webtransport::ServerSession) -> anyhow::Result<()>; + fn timeout(&self) -> Option; } diff --git a/server/src/transport/connection.rs b/server/src/transport/connection.rs index e9766dc..5d806f5 100644 --- a/server/src/transport/connection.rs +++ b/server/src/transport/connection.rs @@ -9,7 +9,7 @@ use super::app; pub type Map = hmap::HashMap>; pub struct Connection { - pub quiche: quiche::Connection, - pub session: Option, - pub app: T, + pub quiche: quiche::Connection, + pub session: Option, + pub app: T, } diff --git a/server/src/transport/server.rs b/server/src/transport/server.rs index ed2dee3..cb6854c 100644 --- a/server/src/transport/server.rs +++ b/server/src/transport/server.rs @@ -8,336 +8,319 @@ use super::connection; const MAX_DATAGRAM_SIZE: usize = 1350; pub struct Server { - // IO stuff - socket: mio::net::UdpSocket, - poll: mio::Poll, - events: mio::Events, + // IO stuff + socket: mio::net::UdpSocket, + poll: mio::Poll, + events: mio::Events, - // QUIC stuff - quic: quiche::Config, - seed: ring::hmac::Key, // connection ID seed + // QUIC stuff + quic: quiche::Config, + seed: ring::hmac::Key, // connection ID seed - conns: connection::Map, + conns: connection::Map, } pub struct Config { - pub addr: String, - pub cert: String, - pub key: String, + pub addr: String, + pub cert: String, + pub key: String, } impl Server { - pub fn new(config: Config) -> io::Result { - // Listen on the provided socket address - let addr = config.addr.parse().unwrap(); - let mut socket = mio::net::UdpSocket::bind(addr).unwrap(); + pub fn new(config: Config) -> io::Result { + // Listen on the provided socket address + let addr = config.addr.parse().unwrap(); + let mut socket = mio::net::UdpSocket::bind(addr).unwrap(); - // Setup the event loop. - let poll = mio::Poll::new().unwrap(); - let events = mio::Events::with_capacity(1024); + // Setup the event loop. + let poll = mio::Poll::new().unwrap(); + let events = mio::Events::with_capacity(1024); - poll.registry() - .register(&mut socket, mio::Token(0), mio::Interest::READABLE) - .unwrap(); + poll.registry() + .register(&mut socket, mio::Token(0), mio::Interest::READABLE) + .unwrap(); - // Generate random values for connection IDs. - let rng = ring::rand::SystemRandom::new(); - let seed = ring::hmac::Key::generate(ring::hmac::HMAC_SHA256, &rng).unwrap(); + // Generate random values for connection IDs. + let rng = ring::rand::SystemRandom::new(); + let seed = ring::hmac::Key::generate(ring::hmac::HMAC_SHA256, &rng).unwrap(); - // Create the configuration for the QUIC conns. - let mut quic = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap(); - quic.load_cert_chain_from_pem_file(&config.cert).unwrap(); - quic.load_priv_key_from_pem_file(&config.key).unwrap(); - quic.set_application_protos(quiche::h3::APPLICATION_PROTOCOL) - .unwrap(); - quic.set_max_idle_timeout(5000); - quic.set_max_recv_udp_payload_size(MAX_DATAGRAM_SIZE); - quic.set_max_send_udp_payload_size(MAX_DATAGRAM_SIZE); - quic.set_initial_max_data(10_000_000); - quic.set_initial_max_stream_data_bidi_local(1_000_000); - quic.set_initial_max_stream_data_bidi_remote(1_000_000); - quic.set_initial_max_stream_data_uni(1_000_000); - quic.set_initial_max_streams_bidi(100); - quic.set_initial_max_streams_uni(100); - quic.set_disable_active_migration(true); - quic.enable_early_data(); - quic.enable_dgram(true, 65536, 65536); + // Create the configuration for the QUIC conns. + let mut quic = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap(); + quic.load_cert_chain_from_pem_file(&config.cert).unwrap(); + quic.load_priv_key_from_pem_file(&config.key).unwrap(); + quic.set_application_protos(quiche::h3::APPLICATION_PROTOCOL).unwrap(); + quic.set_max_idle_timeout(5000); + quic.set_max_recv_udp_payload_size(MAX_DATAGRAM_SIZE); + quic.set_max_send_udp_payload_size(MAX_DATAGRAM_SIZE); + quic.set_initial_max_data(10_000_000); + quic.set_initial_max_stream_data_bidi_local(1_000_000); + quic.set_initial_max_stream_data_bidi_remote(1_000_000); + quic.set_initial_max_stream_data_uni(1_000_000); + quic.set_initial_max_streams_bidi(100); + quic.set_initial_max_streams_uni(100); + quic.set_disable_active_migration(true); + quic.enable_early_data(); + quic.enable_dgram(true, 65536, 65536); - let conns = Default::default(); + let conns = Default::default(); - Ok(Server { - socket, - poll, - events, + Ok(Server { + socket, + poll, + events, - quic, - seed, + quic, + seed, - conns, - }) - } + conns, + }) + } - pub fn run(&mut self) -> anyhow::Result<()> { - log::info!("listening on {}", self.socket.local_addr()?); + pub fn run(&mut self) -> anyhow::Result<()> { + log::info!("listening on {}", self.socket.local_addr()?); - loop { - self.wait()?; - self.receive()?; - self.app()?; - self.send()?; - self.cleanup(); - } - } + loop { + self.wait()?; + self.receive()?; + self.app()?; + self.send()?; + self.cleanup(); + } + } - pub fn wait(&mut self) -> anyhow::Result<()> { - // Find the shorter timeout from all the active connections. - // - // TODO: use event loop that properly supports timers - let timeout = self - .conns - .values() - .filter_map(|c| { - let timeout = c.quiche.timeout(); - let expires = c.app.timeout(); + pub fn wait(&mut self) -> anyhow::Result<()> { + // Find the shorter timeout from all the active connections. + // + // TODO: use event loop that properly supports timers + let timeout = self + .conns + .values() + .filter_map(|c| { + let timeout = c.quiche.timeout(); + let expires = c.app.timeout(); - match (timeout, expires) { - (Some(a), Some(b)) => Some(a.min(b)), - (Some(a), None) => Some(a), - (None, Some(b)) => Some(b), - (None, None) => None, - } - }) - .min(); + match (timeout, expires) { + (Some(a), Some(b)) => Some(a.min(b)), + (Some(a), None) => Some(a), + (None, Some(b)) => Some(b), + (None, None) => None, + } + }) + .min(); - self.poll.poll(&mut self.events, timeout).unwrap(); + self.poll.poll(&mut self.events, timeout).unwrap(); - // If the event loop reported no events, it means that the timeout - // has expired, so handle it without attempting to read packets. We - // will then proceed with the send loop. - if self.events.is_empty() { - for conn in self.conns.values_mut() { - conn.quiche.on_timeout(); - } - } + // If the event loop reported no events, it means that the timeout + // has expired, so handle it without attempting to read packets. We + // will then proceed with the send loop. + if self.events.is_empty() { + for conn in self.conns.values_mut() { + conn.quiche.on_timeout(); + } + } - Ok(()) - } + Ok(()) + } - // Reads packets from the socket, updating any internal connection state. - fn receive(&mut self) -> anyhow::Result<()> { - let mut src = [0; MAX_DATAGRAM_SIZE]; + // Reads packets from the socket, updating any internal connection state. + fn receive(&mut self) -> anyhow::Result<()> { + let mut src = [0; MAX_DATAGRAM_SIZE]; - // Try reading any data currently available on the socket. - loop { - let (len, from) = match self.socket.recv_from(&mut src) { - Ok(v) => v, - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => return Ok(()), - Err(e) => return Err(e.into()), - }; + // Try reading any data currently available on the socket. + loop { + let (len, from) = match self.socket.recv_from(&mut src) { + Ok(v) => v, + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => return Ok(()), + Err(e) => return Err(e.into()), + }; - let src = &mut src[..len]; + let src = &mut src[..len]; - let info = quiche::RecvInfo { - to: self.socket.local_addr().unwrap(), - from, - }; + let info = quiche::RecvInfo { + to: self.socket.local_addr().unwrap(), + from, + }; - // Parse the QUIC packet's header. - let hdr = quiche::Header::from_slice(src, quiche::MAX_CONN_ID_LEN).unwrap(); + // Parse the QUIC packet's header. + let hdr = quiche::Header::from_slice(src, quiche::MAX_CONN_ID_LEN).unwrap(); - let conn_id = ring::hmac::sign(&self.seed, &hdr.dcid); - let conn_id = &conn_id.as_ref()[..quiche::MAX_CONN_ID_LEN]; - let conn_id = conn_id.to_vec().into(); + let conn_id = ring::hmac::sign(&self.seed, &hdr.dcid); + let conn_id = &conn_id.as_ref()[..quiche::MAX_CONN_ID_LEN]; + let conn_id = conn_id.to_vec().into(); - // Check if it's an existing connection. - if let Some(conn) = self.conns.get_mut(&hdr.dcid) { - conn.quiche.recv(src, info)?; + // Check if it's an existing connection. + if let Some(conn) = self.conns.get_mut(&hdr.dcid) { + conn.quiche.recv(src, info)?; - if conn.session.is_none() && conn.quiche.is_established() { - conn.session = Some(webtransport::ServerSession::with_transport( - &mut conn.quiche, - )?) - } + if conn.session.is_none() && conn.quiche.is_established() { + conn.session = Some(webtransport::ServerSession::with_transport(&mut conn.quiche)?) + } - continue; - } else if let Some(conn) = self.conns.get_mut(&conn_id) { - conn.quiche.recv(src, info)?; + continue; + } else if let Some(conn) = self.conns.get_mut(&conn_id) { + conn.quiche.recv(src, info)?; - // TODO is this needed here? - if conn.session.is_none() && conn.quiche.is_established() { - conn.session = Some(webtransport::ServerSession::with_transport( - &mut conn.quiche, - )?) - } + // TODO is this needed here? + if conn.session.is_none() && conn.quiche.is_established() { + conn.session = Some(webtransport::ServerSession::with_transport(&mut conn.quiche)?) + } - continue; - } + continue; + } - if hdr.ty != quiche::Type::Initial { - log::warn!("unknown connection ID"); - continue; - } + if hdr.ty != quiche::Type::Initial { + log::warn!("unknown connection ID"); + continue; + } - let mut dst = [0; MAX_DATAGRAM_SIZE]; + let mut dst = [0; MAX_DATAGRAM_SIZE]; - if !quiche::version_is_supported(hdr.version) { - let len = quiche::negotiate_version(&hdr.scid, &hdr.dcid, &mut dst).unwrap(); - let dst = &dst[..len]; + if !quiche::version_is_supported(hdr.version) { + let len = quiche::negotiate_version(&hdr.scid, &hdr.dcid, &mut dst).unwrap(); + let dst = &dst[..len]; - self.socket.send_to(dst, from).unwrap(); - continue; - } + self.socket.send_to(dst, from).unwrap(); + continue; + } - let mut scid = [0; quiche::MAX_CONN_ID_LEN]; - scid.copy_from_slice(&conn_id); + let mut scid = [0; quiche::MAX_CONN_ID_LEN]; + scid.copy_from_slice(&conn_id); - let scid = quiche::ConnectionId::from_ref(&scid); + let scid = quiche::ConnectionId::from_ref(&scid); - // Token is always present in Initial packets. - let token = hdr.token.as_ref().unwrap(); + // Token is always present in Initial packets. + let token = hdr.token.as_ref().unwrap(); - // Do stateless retry if the client didn't send a token. - if token.is_empty() { - let new_token = mint_token(&hdr, &from); + // Do stateless retry if the client didn't send a token. + if token.is_empty() { + let new_token = mint_token(&hdr, &from); - let len = quiche::retry( - &hdr.scid, - &hdr.dcid, - &scid, - &new_token, - hdr.version, - &mut dst, - ) - .unwrap(); + let len = quiche::retry(&hdr.scid, &hdr.dcid, &scid, &new_token, hdr.version, &mut dst).unwrap(); - let dst = &dst[..len]; + let dst = &dst[..len]; - self.socket.send_to(dst, from).unwrap(); - continue; - } + self.socket.send_to(dst, from).unwrap(); + continue; + } - let odcid = validate_token(&from, token); + let odcid = validate_token(&from, token); - // The token was not valid, meaning the retry failed, so - // drop the packet. - if odcid.is_none() { - log::warn!("invalid token"); - continue; - } + // The token was not valid, meaning the retry failed, so + // drop the packet. + if odcid.is_none() { + log::warn!("invalid token"); + continue; + } - if scid.len() != hdr.dcid.len() { - log::warn!("invalid connection ID"); - continue; - } + if scid.len() != hdr.dcid.len() { + log::warn!("invalid connection ID"); + continue; + } - // Reuse the source connection ID we sent in the Retry packet, - // instead of changing it again. - let conn_id = hdr.dcid.clone(); - let local_addr = self.socket.local_addr().unwrap(); + // Reuse the source connection ID we sent in the Retry packet, + // instead of changing it again. + let conn_id = hdr.dcid.clone(); + let local_addr = self.socket.local_addr().unwrap(); - log::debug!("new connection: dcid={:?} scid={:?}", hdr.dcid, scid); + log::debug!("new connection: dcid={:?} scid={:?}", hdr.dcid, scid); - let mut conn = - quiche::accept(&conn_id, odcid.as_ref(), local_addr, from, &mut self.quic)?; + let mut conn = quiche::accept(&conn_id, odcid.as_ref(), local_addr, from, &mut self.quic)?; - // Log each session with QLOG if the ENV var is set. - if let Some(dir) = std::env::var_os("QLOGDIR") { - let id = format!("{:?}", &scid); + // Log each session with QLOG if the ENV var is set. + if let Some(dir) = std::env::var_os("QLOGDIR") { + let id = format!("{:?}", &scid); - let mut path = std::path::PathBuf::from(dir); - let filename = format!("server-{id}.sqlog"); - path.push(filename); + let mut path = std::path::PathBuf::from(dir); + let filename = format!("server-{id}.sqlog"); + path.push(filename); - let writer = match std::fs::File::create(&path) { - Ok(f) => std::io::BufWriter::new(f), + let writer = match std::fs::File::create(&path) { + Ok(f) => std::io::BufWriter::new(f), - Err(e) => panic!( - "Error creating qlog file attempted path was {:?}: {}", - path, e - ), - }; + Err(e) => panic!("Error creating qlog file attempted path was {:?}: {}", path, e), + }; - conn.set_qlog( - std::boxed::Box::new(writer), - "warp-server qlog".to_string(), - format!("{} id={}", "warp-server qlog", id), - ); - } + conn.set_qlog( + std::boxed::Box::new(writer), + "warp-server qlog".to_string(), + format!("{} id={}", "warp-server qlog", id), + ); + } - // Process potentially coalesced packets. - conn.recv(src, info)?; + // Process potentially coalesced packets. + conn.recv(src, info)?; - let user = connection::Connection { - quiche: conn, - session: None, - app: T::default(), - }; + let user = connection::Connection { + quiche: conn, + session: None, + app: T::default(), + }; - self.conns.insert(conn_id, user); - } - } + self.conns.insert(conn_id, user); + } + } - pub fn app(&mut self) -> anyhow::Result<()> { - for conn in self.conns.values_mut() { - if conn.quiche.is_closed() { - continue; - } + pub fn app(&mut self) -> anyhow::Result<()> { + for conn in self.conns.values_mut() { + if conn.quiche.is_closed() { + continue; + } - if let Some(session) = &mut conn.session { - if let Err(e) = conn.app.poll(&mut conn.quiche, session) { - log::debug!("app error: {:?}", e); + if let Some(session) = &mut conn.session { + if let Err(e) = conn.app.poll(&mut conn.quiche, session) { + log::debug!("app error: {:?}", e); - // Close the connection on any application error - let reason = format!("app error: {:?}", e); - conn.quiche.close(true, 0xff, reason.as_bytes()).ok(); - } - } - } + // Close the connection on any application error + let reason = format!("app error: {:?}", e); + conn.quiche.close(true, 0xff, reason.as_bytes()).ok(); + } + } + } - Ok(()) - } + Ok(()) + } - // Generate outgoing QUIC packets for all active connections and send - // them on the UDP socket, until quiche reports that there are no more - // packets to be sent. - pub fn send(&mut self) -> anyhow::Result<()> { - for conn in self.conns.values_mut() { - let conn = &mut conn.quiche; + // Generate outgoing QUIC packets for all active connections and send + // them on the UDP socket, until quiche reports that there are no more + // packets to be sent. + pub fn send(&mut self) -> anyhow::Result<()> { + for conn in self.conns.values_mut() { + let conn = &mut conn.quiche; - if let Err(e) = send_conn(&self.socket, conn) { - log::error!("{} send failed: {:?}", conn.trace_id(), e); - conn.close(false, 0x1, b"fail").ok(); - } - } + if let Err(e) = send_conn(&self.socket, conn) { + log::error!("{} send failed: {:?}", conn.trace_id(), e); + conn.close(false, 0x1, b"fail").ok(); + } + } - Ok(()) - } + Ok(()) + } - pub fn cleanup(&mut self) { - // Garbage collect closed connections. - self.conns.retain(|_, ref mut c| !c.quiche.is_closed()); - } + pub fn cleanup(&mut self) { + // Garbage collect closed connections. + self.conns.retain(|_, ref mut c| !c.quiche.is_closed()); + } } // Send any pending packets for the connection over the socket. fn send_conn(socket: &mio::net::UdpSocket, conn: &mut quiche::Connection) -> anyhow::Result<()> { - let mut pkt = [0; MAX_DATAGRAM_SIZE]; + let mut pkt = [0; MAX_DATAGRAM_SIZE]; - loop { - let (size, info) = match conn.send(&mut pkt) { - Ok(v) => v, - Err(quiche::Error::Done) => return Ok(()), - Err(e) => return Err(e.into()), - }; + loop { + let (size, info) = match conn.send(&mut pkt) { + Ok(v) => v, + Err(quiche::Error::Done) => return Ok(()), + Err(e) => return Err(e.into()), + }; - let pkt = &pkt[..size]; + let pkt = &pkt[..size]; - match socket.send_to(pkt, info.to) { - Err(e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(()), - Err(e) => return Err(e.into()), - Ok(_) => (), - } - } + match socket.send_to(pkt, info.to) { + Err(e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(()), + Err(e) => return Err(e.into()), + Ok(_) => (), + } + } } /// Generate a stateless retry token. @@ -349,19 +332,19 @@ fn send_conn(socket: &mio::net::UdpSocket, conn: &mut quiche::Connection) -> any /// Note that this function is only an example and doesn't do any cryptographic /// authenticate of the token. *It should not be used in production system*. fn mint_token(hdr: &quiche::Header, src: &std::net::SocketAddr) -> Vec { - let mut token = Vec::new(); + let mut token = Vec::new(); - token.extend_from_slice(b"quiche"); + token.extend_from_slice(b"quiche"); - let addr = match src.ip() { - std::net::IpAddr::V4(a) => a.octets().to_vec(), - std::net::IpAddr::V6(a) => a.octets().to_vec(), - }; + let addr = match src.ip() { + std::net::IpAddr::V4(a) => a.octets().to_vec(), + std::net::IpAddr::V6(a) => a.octets().to_vec(), + }; - token.extend_from_slice(&addr); - token.extend_from_slice(&hdr.dcid); + token.extend_from_slice(&addr); + token.extend_from_slice(&hdr.dcid); - token + token } /// Validates a stateless retry token. @@ -371,28 +354,25 @@ fn mint_token(hdr: &quiche::Header, src: &std::net::SocketAddr) -> Vec { /// /// Note that this function is only an example and doesn't do any cryptographic /// authenticate of the token. *It should not be used in production system*. -fn validate_token<'a>( - src: &std::net::SocketAddr, - token: &'a [u8], -) -> Option> { - if token.len() < 6 { - return None; - } +fn validate_token<'a>(src: &std::net::SocketAddr, token: &'a [u8]) -> Option> { + if token.len() < 6 { + return None; + } - if &token[..6] != b"quiche" { - return None; - } + if &token[..6] != b"quiche" { + return None; + } - let token = &token[6..]; + let token = &token[6..]; - let addr = match src.ip() { - std::net::IpAddr::V4(a) => a.octets().to_vec(), - std::net::IpAddr::V6(a) => a.octets().to_vec(), - }; + let addr = match src.ip() { + std::net::IpAddr::V4(a) => a.octets().to_vec(), + std::net::IpAddr::V6(a) => a.octets().to_vec(), + }; - if token.len() < addr.len() || &token[..addr.len()] != addr.as_slice() { - return None; - } + if token.len() < addr.len() || &token[..addr.len()] != addr.as_slice() { + return None; + } - Some(quiche::ConnectionId::from_ref(&token[addr.len()..])) + Some(quiche::ConnectionId::from_ref(&token[addr.len()..])) } diff --git a/server/src/transport/streams.rs b/server/src/transport/streams.rs index de9bdd9..862da0a 100644 --- a/server/src/transport/streams.rs +++ b/server/src/transport/streams.rs @@ -5,145 +5,132 @@ use quiche; #[derive(Default)] pub struct Streams { - ordered: Vec, + ordered: Vec, } struct Stream { - id: u64, - order: u64, + id: u64, + order: u64, - buffer: VecDeque, - fin: bool, + buffer: VecDeque, + fin: bool, } impl Streams { - // Write the data to the given stream, buffering it if needed. - pub fn send( - &mut self, - conn: &mut quiche::Connection, - id: u64, - buf: &[u8], - fin: bool, - ) -> anyhow::Result<()> { - if buf.is_empty() && !fin { - return Ok(()); - } + // Write the data to the given stream, buffering it if needed. + pub fn send(&mut self, conn: &mut quiche::Connection, id: u64, buf: &[u8], fin: bool) -> anyhow::Result<()> { + if buf.is_empty() && !fin { + return Ok(()); + } - // Get the index of the stream, or add it to the list of streams. - let pos = self - .ordered - .iter() - .position(|s| s.id == id) - .unwrap_or_else(|| { - // Create a new stream - let stream = Stream { - id, - buffer: VecDeque::new(), - fin: false, - order: 0, // Default to highest priority until send_order is called. - }; + // Get the index of the stream, or add it to the list of streams. + let pos = self.ordered.iter().position(|s| s.id == id).unwrap_or_else(|| { + // Create a new stream + let stream = Stream { + id, + buffer: VecDeque::new(), + fin: false, + order: 0, // Default to highest priority until send_order is called. + }; - self.insert(conn, stream) - }); + self.insert(conn, stream) + }); - let stream = &mut self.ordered[pos]; + let stream = &mut self.ordered[pos]; - // Check if we've already closed the stream, just in case. - if stream.fin && !buf.is_empty() { - anyhow::bail!("stream is already finished"); - } + // Check if we've already closed the stream, just in case. + if stream.fin && !buf.is_empty() { + anyhow::bail!("stream is already finished"); + } - // If there's no data buffered, try to write it immediately. - let size = if stream.buffer.is_empty() { - match conn.stream_send(id, buf, fin) { - Ok(size) => size, - Err(quiche::Error::Done) => 0, - Err(e) => anyhow::bail!(e), - } - } else { - 0 - }; + // If there's no data buffered, try to write it immediately. + let size = if stream.buffer.is_empty() { + match conn.stream_send(id, buf, fin) { + Ok(size) => size, + Err(quiche::Error::Done) => 0, + Err(e) => anyhow::bail!(e), + } + } else { + 0 + }; - if size < buf.len() { - // Short write, save the rest for later. - stream.buffer.extend(&buf[size..]); - } + if size < buf.len() { + // Short write, save the rest for later. + stream.buffer.extend(&buf[size..]); + } - stream.fin |= fin; + stream.fin |= fin; - Ok(()) - } + Ok(()) + } - // Flush any pending stream data. - pub fn poll(&mut self, conn: &mut quiche::Connection) { - self.ordered.retain_mut(|s| s.poll(conn).is_ok()); - } + // Flush any pending stream data. + pub fn poll(&mut self, conn: &mut quiche::Connection) { + self.ordered.retain_mut(|s| s.poll(conn).is_ok()); + } - // Set the send order of the stream. - pub fn send_order(&mut self, conn: &mut quiche::Connection, id: u64, order: u64) { - let mut stream = match self.ordered.iter().position(|s| s.id == id) { - // Remove the stream from the existing list. - Some(pos) => self.ordered.remove(pos), + // Set the send order of the stream. + pub fn send_order(&mut self, conn: &mut quiche::Connection, id: u64, order: u64) { + let mut stream = match self.ordered.iter().position(|s| s.id == id) { + // Remove the stream from the existing list. + Some(pos) => self.ordered.remove(pos), - // This is a new stream, insert it into the list. - None => Stream { - id, - buffer: VecDeque::new(), - fin: false, - order, - }, - }; + // This is a new stream, insert it into the list. + None => Stream { + id, + buffer: VecDeque::new(), + fin: false, + order, + }, + }; - stream.order = order; + stream.order = order; - self.insert(conn, stream); - } + self.insert(conn, stream); + } - fn insert(&mut self, conn: &mut quiche::Connection, stream: Stream) -> usize { - // Look for the position to insert the stream. - let pos = match self - .ordered - .binary_search_by_key(&stream.order, |s| s.order) - { - Ok(pos) | Err(pos) => pos, - }; + fn insert(&mut self, conn: &mut quiche::Connection, stream: Stream) -> usize { + // Look for the position to insert the stream. + let pos = match self.ordered.binary_search_by_key(&stream.order, |s| s.order) { + Ok(pos) | Err(pos) => pos, + }; - self.ordered.insert(pos, stream); + self.ordered.insert(pos, stream); - // Reprioritize all later streams. - // TODO we can avoid this if stream_priorty takes a u64 - for (i, stream) in self.ordered[pos..].iter().enumerate() { - _ = conn.stream_priority(stream.id, (pos + i) as u8, true); - } + // Reprioritize all later streams. + // TODO we can avoid this if stream_priorty takes a u64 + for (i, stream) in self.ordered[pos..].iter().enumerate() { + _ = conn.stream_priority(stream.id, (pos + i) as u8, true); + } - pos - } + pos + } } impl Stream { - fn poll(&mut self, conn: &mut quiche::Connection) -> quiche::Result<()> { - // Keep reading from the buffer until it's empty. - while !self.buffer.is_empty() { - // VecDeque is a ring buffer, so we can't write the whole thing at once. - let parts = self.buffer.as_slices(); + fn poll(&mut self, conn: &mut quiche::Connection) -> quiche::Result<()> { + // Keep reading from the buffer until it's empty. + while !self.buffer.is_empty() { + // VecDeque is a ring buffer, so we can't write the whole thing at once. + let parts = self.buffer.as_slices(); - let size = conn.stream_send(self.id, parts.0, false)?; - if size == 0 { - // No more space available for this stream. - return Ok(()); - } + let size = conn.stream_send(self.id, parts.0, false)?; + if size == 0 { + // No more space available for this stream. + return Ok(()); + } - // Remove the bytes that were written. - self.buffer.drain(..size); - } + // Remove the bytes that were written. + self.buffer.drain(..size); + } - if self.fin { - // Write the stream done signal. - conn.stream_send(self.id, &[], true)?; + if self.fin { + // Write the stream done signal. + conn.stream_send(self.id, &[], true)?; - Err(quiche::Error::Done) - } else { - Ok(()) - } - } + Err(quiche::Error::Done) + } else { + Ok(()) + } + } } diff --git a/web/.eslintrc.cjs b/web/.eslintrc.cjs index 61796f9..b76eb47 100644 --- a/web/.eslintrc.cjs +++ b/web/.eslintrc.cjs @@ -1,26 +1,23 @@ /* eslint-env node */ module.exports = { - extends: [ - "eslint:recommended", - "plugin:@typescript-eslint/recommended", - "prettier", - ], - parser: "@typescript-eslint/parser", - plugins: ["@typescript-eslint"], - root: true, - ignorePatterns: ["dist", "node_modules"], - rules: { - "@typescript-eslint/ban-ts-comment": "off", - "@typescript-eslint/no-non-null-assertion": "off", - "@typescript-eslint/no-explicit-any": "off", - "no-unused-vars": "off", // note you must disable the base rule as it can report incorrect errors - "@typescript-eslint/no-unused-vars": [ - "warn", // or "error" - { - argsIgnorePattern: "^_", - varsIgnorePattern: "^_", - caughtErrorsIgnorePattern: "^_", - }, - ], - }, + extends: ["eslint:recommended", "plugin:@typescript-eslint/recommended", "prettier"], + parser: "@typescript-eslint/parser", + plugins: ["@typescript-eslint", "prettier"], + root: true, + ignorePatterns: ["dist", "node_modules"], + rules: { + "@typescript-eslint/ban-ts-comment": "off", + "@typescript-eslint/no-non-null-assertion": "off", + "@typescript-eslint/no-explicit-any": "off", + "no-unused-vars": "off", // note you must disable the base rule as it can report incorrect errors + "@typescript-eslint/no-unused-vars": [ + "warn", // or "error" + { + argsIgnorePattern: "^_", + varsIgnorePattern: "^_", + caughtErrorsIgnorePattern: "^_", + }, + ], + "prettier/prettier": 2, // Means error + }, } diff --git a/web/.prettierrc.json b/web/.prettierrc.json deleted file mode 100644 index cce9d3c..0000000 --- a/web/.prettierrc.json +++ /dev/null @@ -1,3 +0,0 @@ -{ - "semi": false -} diff --git a/web/.prettierrc.yaml b/web/.prettierrc.yaml new file mode 100644 index 0000000..708c01a --- /dev/null +++ b/web/.prettierrc.yaml @@ -0,0 +1,4 @@ +# note: root .editorconfig is used + +# Don't insert semi-colons unless needed +semi: false diff --git a/web/.proxyrc.js b/web/.proxyrc.js index 1295b7d..3554d52 100644 --- a/web/.proxyrc.js +++ b/web/.proxyrc.js @@ -1,7 +1,7 @@ module.exports = function (app) { - app.use((req, res, next) => { - res.setHeader("Cross-Origin-Opener-Policy", "same-origin") - res.setHeader("Cross-Origin-Embedder-Policy", "require-corp") - next() - }) + app.use((req, res, next) => { + res.setHeader("Cross-Origin-Opener-Policy", "same-origin") + res.setHeader("Cross-Origin-Embedder-Policy", "require-corp") + next() + }) } diff --git a/web/package.json b/web/package.json index 0cdf4ab..0121fcd 100644 --- a/web/package.json +++ b/web/package.json @@ -1,27 +1,28 @@ { - "license": "Apache-2.0", - "source": "src/index.html", - "scripts": { - "serve": "parcel serve --https --cert ../cert/localhost.crt --key ../cert/localhost.key --port 4444 --open", - "build": "parcel build", - "check": "tsc --noEmit", - "lint": "eslint .", - "fmt": "prettier --write ." - }, - "devDependencies": { - "@parcel/transformer-inline-string": "2.8.3", - "@parcel/validator-typescript": "^2.6.0", - "@types/audioworklet": "^0.0.41", - "@types/dom-webcodecs": "^0.1.6", - "@typescript-eslint/eslint-plugin": "^5.59.7", - "@typescript-eslint/parser": "^5.59.7", - "eslint": "^8.41.0", - "eslint-config-prettier": "^8.8.0", - "parcel": "^2.8.0", - "prettier": "^2.8.8", - "typescript": "^5.0.4" - }, - "dependencies": { - "mp4box": "^0.5.2" - } + "license": "Apache-2.0", + "source": "src/index.html", + "scripts": { + "serve": "parcel serve --https --cert ../cert/localhost.crt --key ../cert/localhost.key --port 4444 --open", + "build": "parcel build", + "check": "tsc --noEmit", + "lint": "eslint .", + "fmt": "prettier --write ." + }, + "devDependencies": { + "@parcel/transformer-inline-string": "2.8.3", + "@parcel/validator-typescript": "^2.6.0", + "@types/audioworklet": "^0.0.41", + "@types/dom-webcodecs": "^0.1.6", + "@typescript-eslint/eslint-plugin": "^5.59.7", + "@typescript-eslint/parser": "^5.59.7", + "eslint": "^8.41.0", + "eslint-config-prettier": "^8.8.0", + "eslint-plugin-prettier": "^4.2.1", + "parcel": "^2.8.0", + "prettier": "^2.8.8", + "typescript": "^5.0.4" + }, + "dependencies": { + "mp4box": "^0.5.2" + } } diff --git a/web/src/broadcaster/encoder.ts b/web/src/broadcaster/encoder.ts index 76ee774..4ba7ff7 100644 --- a/web/src/broadcaster/encoder.ts +++ b/web/src/broadcaster/encoder.ts @@ -1,104 +1,104 @@ import * as MP4 from "../mp4" export class Encoder { - container: MP4.ISOFile - audio: AudioEncoder - video: VideoEncoder + container: MP4.ISOFile + audio: AudioEncoder + video: VideoEncoder - constructor() { - this.container = new MP4.ISOFile() + constructor() { + this.container = new MP4.ISOFile() - this.audio = new AudioEncoder({ - output: this.onAudio.bind(this), - error: console.warn, - }) + this.audio = new AudioEncoder({ + output: this.onAudio.bind(this), + error: console.warn, + }) - this.video = new VideoEncoder({ - output: this.onVideo.bind(this), - error: console.warn, - }) + this.video = new VideoEncoder({ + output: this.onVideo.bind(this), + error: console.warn, + }) - this.container.init() + this.container.init() - this.audio.configure({ - codec: "mp4a.40.2", - numberOfChannels: 2, - sampleRate: 44100, + this.audio.configure({ + codec: "mp4a.40.2", + numberOfChannels: 2, + sampleRate: 44100, - // TODO bitrate - }) + // TODO bitrate + }) - this.video.configure({ - codec: "avc1.42002A", // TODO h.264 baseline - avc: { format: "avc" }, // or annexb - width: 1280, - height: 720, + this.video.configure({ + codec: "avc1.42002A", // TODO h.264 baseline + avc: { format: "avc" }, // or annexb + width: 1280, + height: 720, - // TODO bitrate - // TODO bitrateMode - // TODO framerate - // TODO latencyMode - }) - } + // TODO bitrate + // TODO bitrateMode + // TODO framerate + // TODO latencyMode + }) + } - onAudio(frame: EncodedAudioChunk, metadata: EncodedAudioChunkMetadata) { - const config = metadata.decoderConfig! - const track_id = 1 + onAudio(frame: EncodedAudioChunk, metadata: EncodedAudioChunkMetadata) { + const config = metadata.decoderConfig! + const track_id = 1 - if (!this.container.getTrackById(track_id)) { - this.container.addTrack({ - id: track_id, - type: "mp4a", // TODO wrong - timescale: 1000, // TODO verify + if (!this.container.getTrackById(track_id)) { + this.container.addTrack({ + id: track_id, + type: "mp4a", // TODO wrong + timescale: 1000, // TODO verify - channel_count: config.numberOfChannels, - samplerate: config.sampleRate, + channel_count: config.numberOfChannels, + samplerate: config.sampleRate, - description: config.description, // TODO verify - // TODO description_boxes?: Box[]; - }) - } + description: config.description, // TODO verify + // TODO description_boxes?: Box[]; + }) + } - const buffer = new Uint8Array(frame.byteLength) - frame.copyTo(buffer) + const buffer = new Uint8Array(frame.byteLength) + frame.copyTo(buffer) - // TODO cts? - const sample = this.container.addSample(track_id, buffer, { - is_sync: frame.type == "key", - duration: frame.duration!, - dts: frame.timestamp, - }) + // TODO cts? + const sample = this.container.addSample(track_id, buffer, { + is_sync: frame.type == "key", + duration: frame.duration!, + dts: frame.timestamp, + }) - const _stream = this.container.createSingleSampleMoof(sample) - } + const _stream = this.container.createSingleSampleMoof(sample) + } - onVideo(frame: EncodedVideoChunk, metadata?: EncodedVideoChunkMetadata) { - const config = metadata!.decoderConfig! - const track_id = 2 + onVideo(frame: EncodedVideoChunk, metadata?: EncodedVideoChunkMetadata) { + const config = metadata!.decoderConfig! + const track_id = 2 - if (!this.container.getTrackById(track_id)) { - this.container.addTrack({ - id: 2, - type: "avc1", - width: config.codedWidth, - height: config.codedHeight, - timescale: 1000, // TODO verify + if (!this.container.getTrackById(track_id)) { + this.container.addTrack({ + id: 2, + type: "avc1", + width: config.codedWidth, + height: config.codedHeight, + timescale: 1000, // TODO verify - description: config.description, // TODO verify - // TODO description_boxes?: Box[]; - }) - } + description: config.description, // TODO verify + // TODO description_boxes?: Box[]; + }) + } - const buffer = new Uint8Array(frame.byteLength) - frame.copyTo(buffer) + const buffer = new Uint8Array(frame.byteLength) + frame.copyTo(buffer) - // TODO cts? - const sample = this.container.addSample(track_id, buffer, { - is_sync: frame.type == "key", - duration: frame.duration!, - dts: frame.timestamp, - }) + // TODO cts? + const sample = this.container.addSample(track_id, buffer, { + is_sync: frame.type == "key", + duration: frame.duration!, + dts: frame.timestamp, + }) - const _stream = this.container.createSingleSampleMoof(sample) - } + const _stream = this.container.createSingleSampleMoof(sample) + } } diff --git a/web/src/broadcaster/index.ts b/web/src/broadcaster/index.ts index f17e9af..fa40b01 100644 --- a/web/src/broadcaster/index.ts +++ b/web/src/broadcaster/index.ts @@ -1,5 +1,5 @@ export default class Broadcaster { - constructor() { - // TODO - } + constructor() { + // TODO + } } diff --git a/web/src/index.css b/web/src/index.css index 871df82..c3d1639 100644 --- a/web/src/index.css +++ b/web/src/index.css @@ -1,75 +1,75 @@ html, body, #player { - width: 100%; + width: 100%; } body { - background: #000000; - color: #ffffff; - padding: 0; - margin: 0; - display: flex; - justify-content: center; - font-family: sans-serif; + background: #000000; + color: #ffffff; + padding: 0; + margin: 0; + display: flex; + justify-content: center; + font-family: sans-serif; } #screen { - position: relative; + position: relative; } #screen #play { - position: absolute; - width: 100%; - height: 100%; - background: rgba(0, 0, 0, 0.5); + position: absolute; + width: 100%; + height: 100%; + background: rgba(0, 0, 0, 0.5); - display: flex; - justify-content: center; - align-items: center; + display: flex; + justify-content: center; + align-items: center; - z-index: 1; + z-index: 1; } #controls { - display: flex; - flex-wrap: wrap; - padding: 8px 16px; + display: flex; + flex-wrap: wrap; + padding: 8px 16px; } #controls > * { - margin-right: 8px; + margin-right: 8px; } #controls label { - margin-right: 8px; + margin-right: 8px; } #stats { - display: grid; - grid-template-columns: auto 1fr; + display: grid; + grid-template-columns: auto 1fr; } #stats label { - padding: 0 1rem; + padding: 0 1rem; } .buffer { - position: relative; - width: 100%; + position: relative; + width: 100%; } .buffer .fill { - position: absolute; - transition-duration: 0.1s; - transition-property: left, right, background-color; - background-color: RebeccaPurple; - height: 100%; - text-align: right; - padding-right: 0.5rem; - overflow: hidden; + position: absolute; + transition-duration: 0.1s; + transition-property: left, right, background-color; + background-color: RebeccaPurple; + height: 100%; + text-align: right; + padding-right: 0.5rem; + overflow: hidden; } .buffer .fill.net { - background-color: Purple; + background-color: Purple; } diff --git a/web/src/index.html b/web/src/index.html index 67ddd41..4171b09 100644 --- a/web/src/index.html +++ b/web/src/index.html @@ -1,33 +1,33 @@  - - - WARP + + + WARP - - + + - -
-
-
click to play
- -
+ +
+
+
click to play
+ +
-
- - -
+
+ + +
-
- -
+
+ +
- -
-
-
+ +
+
+
- - + + diff --git a/web/src/index.ts b/web/src/index.ts index f0f3b21..5c04336 100644 --- a/web/src/index.ts +++ b/web/src/index.ts @@ -7,7 +7,7 @@ import fingerprintHex from "bundle-text:../fingerprint.hex" // Convert the hex to binary. const fingerprint = [] for (let c = 0; c < fingerprintHex.length - 1; c += 2) { - fingerprint.push(parseInt(fingerprintHex.substring(c, c + 2), 16)) + fingerprint.push(parseInt(fingerprintHex.substring(c, c + 2), 16)) } const params = new URLSearchParams(window.location.search) @@ -16,27 +16,27 @@ const url = params.get("url") || "https://localhost:4443/watch" const canvas = document.querySelector("canvas#video")! const transport = new Transport({ - url: url, - fingerprint: { - // TODO remove when Chrome accepts the system CA - algorithm: "sha-256", - value: new Uint8Array(fingerprint), - }, + url: url, + fingerprint: { + // TODO remove when Chrome accepts the system CA + algorithm: "sha-256", + value: new Uint8Array(fingerprint), + }, }) const player = new Player({ - transport, - canvas: canvas.transferControlToOffscreen(), + transport, + canvas: canvas.transferControlToOffscreen(), }) const play = document.querySelector("#screen #play")! const playFunc = (e: Event) => { - player.play() - e.preventDefault() + player.play() + e.preventDefault() - play.removeEventListener("click", playFunc) - play.style.display = "none" + play.removeEventListener("click", playFunc) + play.style.display = "none" } play.addEventListener("click", playFunc) diff --git a/web/src/mp4/index.ts b/web/src/mp4/index.ts index 0a6a50a..8c4cd6f 100644 --- a/web/src/mp4/index.ts +++ b/web/src/mp4/index.ts @@ -1,16 +1,16 @@ // Rename some stuff so it's on brand. export { - createFile as New, - MP4File as File, - MP4ArrayBuffer as ArrayBuffer, - MP4Info as Info, - MP4Track as Track, - MP4AudioTrack as AudioTrack, - MP4VideoTrack as VideoTrack, - DataStream as Stream, - Box, - ISOFile, - Sample, + createFile as New, + MP4File as File, + MP4ArrayBuffer as ArrayBuffer, + MP4Info as Info, + MP4Track as Track, + MP4AudioTrack as AudioTrack, + MP4VideoTrack as VideoTrack, + DataStream as Stream, + Box, + ISOFile, + Sample, } from "mp4box" export { Init, InitParser } from "./init" diff --git a/web/src/mp4/init.ts b/web/src/mp4/init.ts index 70b885b..5ff2802 100644 --- a/web/src/mp4/init.ts +++ b/web/src/mp4/init.ts @@ -1,43 +1,43 @@ import * as MP4 from "./index" export interface Init { - raw: MP4.ArrayBuffer - info: MP4.Info + raw: MP4.ArrayBuffer + info: MP4.Info } export class InitParser { - mp4box: MP4.File - offset: number + mp4box: MP4.File + offset: number - raw: MP4.ArrayBuffer[] - info: Promise + raw: MP4.ArrayBuffer[] + info: Promise - constructor() { - this.mp4box = MP4.New() - this.raw = [] - this.offset = 0 + constructor() { + this.mp4box = MP4.New() + this.raw = [] + this.offset = 0 - // Create a promise that gets resolved once the init segment has been parsed. - this.info = new Promise((resolve, reject) => { - this.mp4box.onError = reject - this.mp4box.onReady = resolve - }) - } + // Create a promise that gets resolved once the init segment has been parsed. + this.info = new Promise((resolve, reject) => { + this.mp4box.onError = reject + this.mp4box.onReady = resolve + }) + } - push(data: Uint8Array) { - // Make a copy of the atom because mp4box only accepts an ArrayBuffer unfortunately - const box = new Uint8Array(data.byteLength) - box.set(data) + push(data: Uint8Array) { + // Make a copy of the atom because mp4box only accepts an ArrayBuffer unfortunately + const box = new Uint8Array(data.byteLength) + box.set(data) - // and for some reason we need to modify the underlying ArrayBuffer with fileStart - const buffer = box.buffer as MP4.ArrayBuffer - buffer.fileStart = this.offset + // and for some reason we need to modify the underlying ArrayBuffer with fileStart + const buffer = box.buffer as MP4.ArrayBuffer + buffer.fileStart = this.offset - // Parse the data - this.offset = this.mp4box.appendBuffer(buffer) - this.mp4box.flush() + // Parse the data + this.offset = this.mp4box.appendBuffer(buffer) + this.mp4box.flush() - // Add the box to our queue of chunks - this.raw.push(buffer) - } + // Add the box to our queue of chunks + this.raw.push(buffer) + } } diff --git a/web/src/mp4/mp4box.d.ts b/web/src/mp4/mp4box.d.ts index 4c07e19..607e55f 100644 --- a/web/src/mp4/mp4box.d.ts +++ b/web/src/mp4/mp4box.d.ts @@ -1,239 +1,231 @@ // https://github.com/gpac/mp4box.js/issues/233 declare module "mp4box" { - export interface MP4MediaTrack { - id: number - created: Date - modified: Date - movie_duration: number - layer: number - alternate_group: number - volume: number - track_width: number - track_height: number - timescale: number - duration: number - bitrate: number - codec: string - language: string - nb_samples: number - } + export interface MP4MediaTrack { + id: number + created: Date + modified: Date + movie_duration: number + layer: number + alternate_group: number + volume: number + track_width: number + track_height: number + timescale: number + duration: number + bitrate: number + codec: string + language: string + nb_samples: number + } - export interface MP4VideoData { - width: number - height: number - } + export interface MP4VideoData { + width: number + height: number + } - export interface MP4VideoTrack extends MP4MediaTrack { - video: MP4VideoData - } + export interface MP4VideoTrack extends MP4MediaTrack { + video: MP4VideoData + } - export interface MP4AudioData { - sample_rate: number - channel_count: number - sample_size: number - } + export interface MP4AudioData { + sample_rate: number + channel_count: number + sample_size: number + } - export interface MP4AudioTrack extends MP4MediaTrack { - audio: MP4AudioData - } + export interface MP4AudioTrack extends MP4MediaTrack { + audio: MP4AudioData + } - export type MP4Track = MP4VideoTrack | MP4AudioTrack + export type MP4Track = MP4VideoTrack | MP4AudioTrack - export interface MP4Info { - duration: number - timescale: number - fragment_duration: number - isFragmented: boolean - isProgressive: boolean - hasIOD: boolean - brands: string[] - created: Date - modified: Date - tracks: MP4Track[] - mime: string - audioTracks: MP4AudioTrack[] - videoTracks: MP4VideoTrack[] - } + export interface MP4Info { + duration: number + timescale: number + fragment_duration: number + isFragmented: boolean + isProgressive: boolean + hasIOD: boolean + brands: string[] + created: Date + modified: Date + tracks: MP4Track[] + mime: string + audioTracks: MP4AudioTrack[] + videoTracks: MP4VideoTrack[] + } - export type MP4ArrayBuffer = ArrayBuffer & { fileStart: number } + export type MP4ArrayBuffer = ArrayBuffer & { fileStart: number } - export interface MP4File { - onMoovStart?: () => void - onReady?: (info: MP4Info) => void - onError?: (e: string) => void - onSamples?: (id: number, user: any, samples: Sample[]) => void + export interface MP4File { + onMoovStart?: () => void + onReady?: (info: MP4Info) => void + onError?: (e: string) => void + onSamples?: (id: number, user: any, samples: Sample[]) => void - appendBuffer(data: MP4ArrayBuffer): number - start(): void - stop(): void - flush(): void + appendBuffer(data: MP4ArrayBuffer): number + start(): void + stop(): void + flush(): void - setExtractionOptions( - id: number, - user: any, - options: ExtractionOptions - ): void - } + setExtractionOptions(id: number, user: any, options: ExtractionOptions): void + } - export function createFile(): MP4File + export function createFile(): MP4File - export interface Sample { - number: number - track_id: number - timescale: number - description_index: number - description: any - data: ArrayBuffer - size: number - alreadyRead?: number - duration: number - cts: number - dts: number - is_sync: boolean - is_leading: number - depends_on: number - is_depended_on: number - has_redundancy: number - degration_priority: number - offset: number - subsamples: any - } + export interface Sample { + number: number + track_id: number + timescale: number + description_index: number + description: any + data: ArrayBuffer + size: number + alreadyRead?: number + duration: number + cts: number + dts: number + is_sync: boolean + is_leading: number + depends_on: number + is_depended_on: number + has_redundancy: number + degration_priority: number + offset: number + subsamples: any + } - export interface ExtractionOptions { - nbSamples: number - } + export interface ExtractionOptions { + nbSamples: number + } - const BIG_ENDIAN: boolean - const LITTLE_ENDIAN: boolean + const BIG_ENDIAN: boolean + const LITTLE_ENDIAN: boolean - export class DataStream { - constructor( - buffer?: ArrayBuffer, - byteOffset?: number, - littleEndian?: boolean - ) - getPosition(): number + export class DataStream { + constructor(buffer?: ArrayBuffer, byteOffset?: number, littleEndian?: boolean) + getPosition(): number - get byteLength(): number - get buffer(): ArrayBuffer - set buffer(v: ArrayBuffer) - get byteOffset(): number - set byteOffset(v: number) - get dataView(): DataView - set dataView(v: DataView) + get byteLength(): number + get buffer(): ArrayBuffer + set buffer(v: ArrayBuffer) + get byteOffset(): number + set byteOffset(v: number) + get dataView(): DataView + set dataView(v: DataView) - seek(pos: number): void - isEof(): boolean + seek(pos: number): void + isEof(): boolean - mapUint8Array(length: number): Uint8Array - readInt32Array(length: number, littleEndian: boolean): Int32Array - readInt16Array(length: number, littleEndian: boolean): Int16Array - readInt8Array(length: number): Int8Array - readUint32Array(length: number, littleEndian: boolean): Uint32Array - readUint16Array(length: number, littleEndian: boolean): Uint16Array - readUint8Array(length: number): Uint8Array - readFloat64Array(length: number, littleEndian: boolean): Float64Array - readFloat32Array(length: number, littleEndian: boolean): Float32Array + mapUint8Array(length: number): Uint8Array + readInt32Array(length: number, littleEndian: boolean): Int32Array + readInt16Array(length: number, littleEndian: boolean): Int16Array + readInt8Array(length: number): Int8Array + readUint32Array(length: number, littleEndian: boolean): Uint32Array + readUint16Array(length: number, littleEndian: boolean): Uint16Array + readUint8Array(length: number): Uint8Array + readFloat64Array(length: number, littleEndian: boolean): Float64Array + readFloat32Array(length: number, littleEndian: boolean): Float32Array - readInt32(littleEndian: boolean): number - readInt16(littleEndian: boolean): number - readInt8(): number - readUint32(littleEndian: boolean): number - readUint16(littleEndian: boolean): number - readUint8(): number - readFloat32(littleEndian: boolean): number - readFloat64(littleEndian: boolean): number + readInt32(littleEndian: boolean): number + readInt16(littleEndian: boolean): number + readInt8(): number + readUint32(littleEndian: boolean): number + readUint16(littleEndian: boolean): number + readUint8(): number + readFloat32(littleEndian: boolean): number + readFloat64(littleEndian: boolean): number - endianness: boolean + endianness: boolean - memcpy( - dst: ArrayBufferLike, - dstOffset: number, - src: ArrayBufferLike, - srcOffset: number, - byteLength: number - ): void + memcpy( + dst: ArrayBufferLike, + dstOffset: number, + src: ArrayBufferLike, + srcOffset: number, + byteLength: number + ): void - // TODO I got bored porting the remaining functions - } + // TODO I got bored porting the remaining functions + } - export class Box { - write(stream: DataStream): void - } + export class Box { + write(stream: DataStream): void + } - export interface TrackOptions { - id?: number - type?: string - width?: number - height?: number - duration?: number - layer?: number - timescale?: number - media_duration?: number - language?: string - hdlr?: string + export interface TrackOptions { + id?: number + type?: string + width?: number + height?: number + duration?: number + layer?: number + timescale?: number + media_duration?: number + language?: string + hdlr?: string - // video - avcDecoderConfigRecord?: any + // video + avcDecoderConfigRecord?: any - // audio - balance?: number - channel_count?: number - samplesize?: number - samplerate?: number + // audio + balance?: number + channel_count?: number + samplesize?: number + samplerate?: number - //captions - namespace?: string - schema_location?: string - auxiliary_mime_types?: string + //captions + namespace?: string + schema_location?: string + auxiliary_mime_types?: string - description?: any - description_boxes?: Box[] + description?: any + description_boxes?: Box[] - default_sample_description_index_id?: number - default_sample_duration?: number - default_sample_size?: number - default_sample_flags?: number - } + default_sample_description_index_id?: number + default_sample_duration?: number + default_sample_size?: number + default_sample_flags?: number + } - export interface FileOptions { - brands?: string[] - timescale?: number - rate?: number - duration?: number - width?: number - } + export interface FileOptions { + brands?: string[] + timescale?: number + rate?: number + duration?: number + width?: number + } - export interface SampleOptions { - sample_description_index?: number - duration?: number - cts?: number - dts?: number - is_sync?: boolean - is_leading?: number - depends_on?: number - is_depended_on?: number - has_redundancy?: number - degradation_priority?: number - subsamples?: any - } + export interface SampleOptions { + sample_description_index?: number + duration?: number + cts?: number + dts?: number + is_sync?: boolean + is_leading?: number + depends_on?: number + is_depended_on?: number + has_redundancy?: number + degradation_priority?: number + subsamples?: any + } - // TODO add the remaining functions - // TODO move to another module - export class ISOFile { - constructor(stream?: DataStream) + // TODO add the remaining functions + // TODO move to another module + export class ISOFile { + constructor(stream?: DataStream) - init(options?: FileOptions): ISOFile - addTrack(options?: TrackOptions): number - addSample(track: number, data: ArrayBuffer, options?: SampleOptions): Sample + init(options?: FileOptions): ISOFile + addTrack(options?: TrackOptions): number + addSample(track: number, data: ArrayBuffer, options?: SampleOptions): Sample - createSingleSampleMoof(sample: Sample): Box + createSingleSampleMoof(sample: Sample): Box - // helpers - getTrackById(id: number): Box | undefined - getTrexById(id: number): Box | undefined - } + // helpers + getTrackById(id: number): Box | undefined + getTrexById(id: number): Box | undefined + } - export {} + export {} } diff --git a/web/src/player/audio.ts b/web/src/player/audio.ts index 50309f1..f989dff 100644 --- a/web/src/player/audio.ts +++ b/web/src/player/audio.ts @@ -2,81 +2,78 @@ import * as Message from "./message" import { Ring } from "./ring" export default class Audio { - ring?: Ring - queue: Array + ring?: Ring + queue: Array - render?: number // non-zero if requestAnimationFrame has been called - last?: number // the timestamp of the last rendered frame, in microseconds + render?: number // non-zero if requestAnimationFrame has been called + last?: number // the timestamp of the last rendered frame, in microseconds - constructor(_config: Message.Config) { - this.queue = [] - } + constructor(_config: Message.Config) { + this.queue = [] + } - push(frame: AudioData) { - // Drop any old frames - if (this.last && frame.timestamp <= this.last) { - frame.close() - return - } + push(frame: AudioData) { + // Drop any old frames + if (this.last && frame.timestamp <= this.last) { + frame.close() + return + } - // Insert the frame into the queue sorted by timestamp. - if ( - this.queue.length > 0 && - this.queue[this.queue.length - 1].timestamp <= frame.timestamp - ) { - // Fast path because we normally append to the end. - this.queue.push(frame) - } else { - // Do a full binary search - let low = 0 - let high = this.queue.length + // Insert the frame into the queue sorted by timestamp. + if (this.queue.length > 0 && this.queue[this.queue.length - 1].timestamp <= frame.timestamp) { + // Fast path because we normally append to the end. + this.queue.push(frame) + } else { + // Do a full binary search + let low = 0 + let high = this.queue.length - while (low < high) { - const mid = (low + high) >>> 1 - if (this.queue[mid].timestamp < frame.timestamp) low = mid + 1 - else high = mid - } + while (low < high) { + const mid = (low + high) >>> 1 + if (this.queue[mid].timestamp < frame.timestamp) low = mid + 1 + else high = mid + } - this.queue.splice(low, 0, frame) - } + this.queue.splice(low, 0, frame) + } - this.emit() - } + this.emit() + } - emit() { - const ring = this.ring - if (!ring) { - return - } + emit() { + const ring = this.ring + if (!ring) { + return + } - while (this.queue.length) { - const frame = this.queue[0] - if (ring.size() + frame.numberOfFrames > ring.capacity) { - // Buffer is full - break - } + while (this.queue.length) { + const frame = this.queue[0] + if (ring.size() + frame.numberOfFrames > ring.capacity) { + // Buffer is full + break + } - const size = ring.write(frame) - if (size < frame.numberOfFrames) { - throw new Error("audio buffer is full") - } + const size = ring.write(frame) + if (size < frame.numberOfFrames) { + throw new Error("audio buffer is full") + } - this.last = frame.timestamp + this.last = frame.timestamp - frame.close() - this.queue.shift() - } - } + frame.close() + this.queue.shift() + } + } - play(play: Message.Play) { - this.ring = new Ring(play.buffer) + play(play: Message.Play) { + this.ring = new Ring(play.buffer) - if (!this.render) { - const sampleRate = 44100 // TODO dynamic + if (!this.render) { + const sampleRate = 44100 // TODO dynamic - // Refresh every half buffer - const refresh = ((play.buffer.capacity / sampleRate) * 1000) / 2 - this.render = setInterval(this.emit.bind(this), refresh) - } - } + // Refresh every half buffer + const refresh = ((play.buffer.capacity / sampleRate) * 1000) / 2 + this.render = setInterval(this.emit.bind(this), refresh) + } + } } diff --git a/web/src/player/decoder.ts b/web/src/player/decoder.ts index 56bfba7..b6df5ea 100644 --- a/web/src/player/decoder.ts +++ b/web/src/player/decoder.ts @@ -5,175 +5,167 @@ import * as Stream from "../stream" import Renderer from "./renderer" export default class Decoder { - init: MP4.InitParser - decoders: Map - renderer: Renderer + init: MP4.InitParser + decoders: Map + renderer: Renderer - constructor(renderer: Renderer) { - this.init = new MP4.InitParser() - this.decoders = new Map() - this.renderer = renderer - } + constructor(renderer: Renderer) { + this.init = new MP4.InitParser() + this.decoders = new Map() + this.renderer = renderer + } - async receiveInit(msg: Message.Init) { - const stream = new Stream.Reader(msg.reader, msg.buffer) - for (;;) { - const data = await stream.read() - if (!data) break + async receiveInit(msg: Message.Init) { + const stream = new Stream.Reader(msg.reader, msg.buffer) + for (;;) { + const data = await stream.read() + if (!data) break - this.init.push(data) - } + this.init.push(data) + } - // TODO make sure the init segment is fully received - } + // TODO make sure the init segment is fully received + } - async receiveSegment(msg: Message.Segment) { - // Wait for the init segment to be fully received and parsed - const init = await this.init.info - const input = MP4.New() + async receiveSegment(msg: Message.Segment) { + // Wait for the init segment to be fully received and parsed + const init = await this.init.info + const input = MP4.New() - input.onSamples = this.onSamples.bind(this) - input.onReady = (track: any) => { - // Extract all of the tracks, because we don't know if it's audio or video. - for (const i of init.tracks) { - input.setExtractionOptions(track.id, i, { nbSamples: 1 }) - } + input.onSamples = this.onSamples.bind(this) + input.onReady = (track: any) => { + // Extract all of the tracks, because we don't know if it's audio or video. + for (const i of init.tracks) { + input.setExtractionOptions(track.id, i, { nbSamples: 1 }) + } - input.start() - } + input.start() + } - // MP4box requires us to reparse the init segment unfortunately - let offset = 0 + // MP4box requires us to reparse the init segment unfortunately + let offset = 0 - for (const raw of this.init.raw) { - raw.fileStart = offset - offset = input.appendBuffer(raw) - } + for (const raw of this.init.raw) { + raw.fileStart = offset + offset = input.appendBuffer(raw) + } - const stream = new Stream.Reader(msg.reader, msg.buffer) + const stream = new Stream.Reader(msg.reader, msg.buffer) - // For whatever reason, mp4box doesn't work until you read an atom at a time. - while (!(await stream.done())) { - const raw = await stream.peek(4) + // For whatever reason, mp4box doesn't work until you read an atom at a time. + while (!(await stream.done())) { + const raw = await stream.peek(4) - // TODO this doesn't support when size = 0 (until EOF) or size = 1 (extended size) - const size = new DataView( - raw.buffer, - raw.byteOffset, - raw.byteLength - ).getUint32(0) - const atom = await stream.bytes(size) + // TODO this doesn't support when size = 0 (until EOF) or size = 1 (extended size) + const size = new DataView(raw.buffer, raw.byteOffset, raw.byteLength).getUint32(0) + const atom = await stream.bytes(size) - // Make a copy of the atom because mp4box only accepts an ArrayBuffer unfortunately - const box = new Uint8Array(atom.byteLength) - box.set(atom) + // Make a copy of the atom because mp4box only accepts an ArrayBuffer unfortunately + const box = new Uint8Array(atom.byteLength) + box.set(atom) - // and for some reason we need to modify the underlying ArrayBuffer with offset - const buffer = box.buffer as MP4.ArrayBuffer - buffer.fileStart = offset + // and for some reason we need to modify the underlying ArrayBuffer with offset + const buffer = box.buffer as MP4.ArrayBuffer + buffer.fileStart = offset - // Parse the data - offset = input.appendBuffer(buffer) - input.flush() - } - } + // Parse the data + offset = input.appendBuffer(buffer) + input.flush() + } + } - onSamples(track_id: number, track: MP4.Track, samples: MP4.Sample[]) { - let decoder = this.decoders.get(track_id) + onSamples(track_id: number, track: MP4.Track, samples: MP4.Sample[]) { + let decoder = this.decoders.get(track_id) - if (!decoder) { - // We need a sample to initalize the video decoder, because of mp4box limitations. - const sample = samples[0] + if (!decoder) { + // We need a sample to initalize the video decoder, because of mp4box limitations. + const sample = samples[0] - if (isVideoTrack(track)) { - // Configure the decoder using the AVC box for H.264 - // TODO it should be easy to support other codecs, just need to know the right boxes. - const avcc = sample.description.avcC - if (!avcc) throw new Error("TODO only h264 is supported") + if (isVideoTrack(track)) { + // Configure the decoder using the AVC box for H.264 + // TODO it should be easy to support other codecs, just need to know the right boxes. + const avcc = sample.description.avcC + if (!avcc) throw new Error("TODO only h264 is supported") - const description = new MP4.Stream(new Uint8Array(avcc.size), 0, false) - avcc.write(description) + const description = new MP4.Stream(new Uint8Array(avcc.size), 0, false) + avcc.write(description) - const videoDecoder = new VideoDecoder({ - output: this.renderer.push.bind(this.renderer), - error: console.warn, - }) + const videoDecoder = new VideoDecoder({ + output: this.renderer.push.bind(this.renderer), + error: console.warn, + }) - videoDecoder.configure({ - codec: track.codec, - codedHeight: track.video.height, - codedWidth: track.video.width, - description: description.buffer?.slice(8), - // optimizeForLatency: true - }) + videoDecoder.configure({ + codec: track.codec, + codedHeight: track.video.height, + codedWidth: track.video.width, + description: description.buffer?.slice(8), + // optimizeForLatency: true + }) - decoder = videoDecoder - } else if (isAudioTrack(track)) { - const audioDecoder = new AudioDecoder({ - output: this.renderer.push.bind(this.renderer), - error: console.warn, - }) + decoder = videoDecoder + } else if (isAudioTrack(track)) { + const audioDecoder = new AudioDecoder({ + output: this.renderer.push.bind(this.renderer), + error: console.warn, + }) - audioDecoder.configure({ - codec: track.codec, - numberOfChannels: track.audio.channel_count, - sampleRate: track.audio.sample_rate, - }) + audioDecoder.configure({ + codec: track.codec, + numberOfChannels: track.audio.channel_count, + sampleRate: track.audio.sample_rate, + }) - decoder = audioDecoder - } else { - throw new Error("unknown track type") - } + decoder = audioDecoder + } else { + throw new Error("unknown track type") + } - this.decoders.set(track_id, decoder) - } + this.decoders.set(track_id, decoder) + } - for (const sample of samples) { - // Convert to microseconds - const timestamp = (1000 * 1000 * sample.dts) / sample.timescale - const duration = (1000 * 1000 * sample.duration) / sample.timescale + for (const sample of samples) { + // Convert to microseconds + const timestamp = (1000 * 1000 * sample.dts) / sample.timescale + const duration = (1000 * 1000 * sample.duration) / sample.timescale - if (isAudioDecoder(decoder)) { - decoder.decode( - new EncodedAudioChunk({ - type: sample.is_sync ? "key" : "delta", - data: sample.data, - duration: duration, - timestamp: timestamp, - }) - ) - } else if (isVideoDecoder(decoder)) { - decoder.decode( - new EncodedVideoChunk({ - type: sample.is_sync ? "key" : "delta", - data: sample.data, - duration: duration, - timestamp: timestamp, - }) - ) - } else { - throw new Error("unknown decoder type") - } - } - } + if (isAudioDecoder(decoder)) { + decoder.decode( + new EncodedAudioChunk({ + type: sample.is_sync ? "key" : "delta", + data: sample.data, + duration: duration, + timestamp: timestamp, + }) + ) + } else if (isVideoDecoder(decoder)) { + decoder.decode( + new EncodedVideoChunk({ + type: sample.is_sync ? "key" : "delta", + data: sample.data, + duration: duration, + timestamp: timestamp, + }) + ) + } else { + throw new Error("unknown decoder type") + } + } + } } -function isAudioDecoder( - decoder: AudioDecoder | VideoDecoder -): decoder is AudioDecoder { - return decoder instanceof AudioDecoder +function isAudioDecoder(decoder: AudioDecoder | VideoDecoder): decoder is AudioDecoder { + return decoder instanceof AudioDecoder } -function isVideoDecoder( - decoder: AudioDecoder | VideoDecoder -): decoder is VideoDecoder { - return decoder instanceof VideoDecoder +function isVideoDecoder(decoder: AudioDecoder | VideoDecoder): decoder is VideoDecoder { + return decoder instanceof VideoDecoder } function isAudioTrack(track: MP4.Track): track is MP4.AudioTrack { - return (track as MP4.AudioTrack).audio !== undefined + return (track as MP4.AudioTrack).audio !== undefined } function isVideoTrack(track: MP4.Track): track is MP4.VideoTrack { - return (track as MP4.VideoTrack).video !== undefined + return (track as MP4.VideoTrack).video !== undefined } diff --git a/web/src/player/index.ts b/web/src/player/index.ts index 00c476f..3ead68d 100644 --- a/web/src/player/index.ts +++ b/web/src/player/index.ts @@ -3,89 +3,86 @@ import * as Ring from "./ring" import Transport from "../transport" export interface Config { - transport: Transport - canvas: OffscreenCanvas + transport: Transport + canvas: OffscreenCanvas } // This class must be created on the main thread due to AudioContext. export default class Player { - context: AudioContext - worker: Worker - worklet: Promise + context: AudioContext + worker: Worker + worklet: Promise - transport: Transport + transport: Transport - constructor(config: Config) { - this.transport = config.transport - this.transport.callback = this + constructor(config: Config) { + this.transport = config.transport + this.transport.callback = this - this.context = new AudioContext({ - latencyHint: "interactive", - sampleRate: 44100, - }) + this.context = new AudioContext({ + latencyHint: "interactive", + sampleRate: 44100, + }) - this.worker = this.setupWorker(config) - this.worklet = this.setupWorklet(config) - } + this.worker = this.setupWorker(config) + this.worklet = this.setupWorklet(config) + } - private setupWorker(config: Config): Worker { - const url = new URL("worker.ts", import.meta.url) + private setupWorker(config: Config): Worker { + const url = new URL("worker.ts", import.meta.url) - const worker = new Worker(url, { - type: "module", - name: "media", - }) + const worker = new Worker(url, { + type: "module", + name: "media", + }) - const msg = { - canvas: config.canvas, - } + const msg = { + canvas: config.canvas, + } - worker.postMessage({ config: msg }, [msg.canvas]) + worker.postMessage({ config: msg }, [msg.canvas]) - return worker - } + return worker + } - private async setupWorklet(_config: Config): Promise { - // Load the worklet source code. - const url = new URL("worklet.ts", import.meta.url) - await this.context.audioWorklet.addModule(url) + private async setupWorklet(_config: Config): Promise { + // Load the worklet source code. + const url = new URL("worklet.ts", import.meta.url) + await this.context.audioWorklet.addModule(url) - const volume = this.context.createGain() - volume.gain.value = 2.0 + const volume = this.context.createGain() + volume.gain.value = 2.0 - // Create a worklet - const worklet = new AudioWorkletNode(this.context, "renderer") - worklet.onprocessorerror = (e: Event) => { - console.error("Audio worklet error:", e) - } + // Create a worklet + const worklet = new AudioWorkletNode(this.context, "renderer") + worklet.onprocessorerror = (e: Event) => { + console.error("Audio worklet error:", e) + } - // Connect the worklet to the volume node and then to the speakers - worklet.connect(volume) - volume.connect(this.context.destination) + // Connect the worklet to the volume node and then to the speakers + worklet.connect(volume) + volume.connect(this.context.destination) - return worklet - } + return worklet + } - onInit(init: Message.Init) { - this.worker.postMessage({ init }, [init.buffer.buffer, init.reader]) - } + onInit(init: Message.Init) { + this.worker.postMessage({ init }, [init.buffer.buffer, init.reader]) + } - onSegment(segment: Message.Segment) { - this.worker.postMessage({ segment }, [ - segment.buffer.buffer, - segment.reader, - ]) - } + onSegment(segment: Message.Segment) { + this.worker.postMessage({ segment }, [segment.buffer.buffer, segment.reader]) + } - async play() { - this.context.resume() + async play() { + this.context.resume() - const play = { - buffer: new Ring.Buffer(2, 44100 / 10), // 100ms of audio - } + const play = { + buffer: new Ring.Buffer(2, 44100 / 10), // 100ms of audio + } - const worklet = await this.worklet - worklet.port.postMessage({ play }) - this.worker.postMessage({ play }) - } + const worklet = await this.worklet + worklet.port.postMessage({ play }) + this.worker.postMessage({ play }) + } } diff --git a/web/src/player/message.ts b/web/src/player/message.ts index 3ed3993..2a82acc 100644 --- a/web/src/player/message.ts +++ b/web/src/player/message.ts @@ -1,21 +1,21 @@ import * as Ring from "./ring" export interface Config { - // video stuff - canvas: OffscreenCanvas + // video stuff + canvas: OffscreenCanvas } export interface Init { - buffer: Uint8Array // unread buffered data - reader: ReadableStream // unread unbuffered data + buffer: Uint8Array // unread buffered data + reader: ReadableStream // unread unbuffered data } export interface Segment { - buffer: Uint8Array // unread buffered data - reader: ReadableStream // unread unbuffered data + buffer: Uint8Array // unread buffered data + reader: ReadableStream // unread unbuffered data } export interface Play { - timestamp?: number - buffer: Ring.Buffer + timestamp?: number + buffer: Ring.Buffer } diff --git a/web/src/player/renderer.ts b/web/src/player/renderer.ts index 9b063a1..a74ac68 100644 --- a/web/src/player/renderer.ts +++ b/web/src/player/renderer.ts @@ -3,34 +3,34 @@ import Audio from "./audio" import Video from "./video" export default class Renderer { - audio: Audio - video: Video + audio: Audio + video: Video - constructor(config: Message.Config) { - this.audio = new Audio(config) - this.video = new Video(config) - } + constructor(config: Message.Config) { + this.audio = new Audio(config) + this.video = new Video(config) + } - push(frame: AudioData | VideoFrame) { - if (isAudioData(frame)) { - this.audio.push(frame) - } else if (isVideoFrame(frame)) { - this.video.push(frame) - } else { - throw new Error("unknown frame type") - } - } + push(frame: AudioData | VideoFrame) { + if (isAudioData(frame)) { + this.audio.push(frame) + } else if (isVideoFrame(frame)) { + this.video.push(frame) + } else { + throw new Error("unknown frame type") + } + } - play(play: Message.Play) { - this.audio.play(play) - this.video.play(play) - } + play(play: Message.Play) { + this.audio.play(play) + this.video.play(play) + } } function isAudioData(frame: AudioData | VideoFrame): frame is AudioData { - return frame instanceof AudioData + return frame instanceof AudioData } function isVideoFrame(frame: AudioData | VideoFrame): frame is VideoFrame { - return frame instanceof VideoFrame + return frame instanceof VideoFrame } diff --git a/web/src/player/ring.ts b/web/src/player/ring.ts index bb28b0d..8d27faf 100644 --- a/web/src/player/ring.ts +++ b/web/src/player/ring.ts @@ -1,159 +1,155 @@ // Ring buffer with audio samples. enum STATE { - READ_POS = 0, // The current read position - WRITE_POS, // The current write position - LENGTH, // Clever way of saving the total number of enums values. + READ_POS = 0, // The current read position + WRITE_POS, // The current write position + LENGTH, // Clever way of saving the total number of enums values. } // No prototype to make this easier to send via postMessage export class Buffer { - state: SharedArrayBuffer + state: SharedArrayBuffer - channels: SharedArrayBuffer[] - capacity: number + channels: SharedArrayBuffer[] + capacity: number - constructor(channels: number, capacity: number) { - // Store the current state in a separate ring buffer. - this.state = new SharedArrayBuffer( - STATE.LENGTH * Int32Array.BYTES_PER_ELEMENT - ) + constructor(channels: number, capacity: number) { + // Store the current state in a separate ring buffer. + this.state = new SharedArrayBuffer(STATE.LENGTH * Int32Array.BYTES_PER_ELEMENT) - // Create a buffer for each audio channel - this.channels = [] - for (let i = 0; i < channels; i += 1) { - const buffer = new SharedArrayBuffer( - capacity * Float32Array.BYTES_PER_ELEMENT - ) - this.channels.push(buffer) - } + // Create a buffer for each audio channel + this.channels = [] + for (let i = 0; i < channels; i += 1) { + const buffer = new SharedArrayBuffer(capacity * Float32Array.BYTES_PER_ELEMENT) + this.channels.push(buffer) + } - this.capacity = capacity - } + this.capacity = capacity + } } export class Ring { - state: Int32Array - channels: Float32Array[] - capacity: number + state: Int32Array + channels: Float32Array[] + capacity: number - constructor(buffer: Buffer) { - this.state = new Int32Array(buffer.state) + constructor(buffer: Buffer) { + this.state = new Int32Array(buffer.state) - this.channels = [] - for (const channel of buffer.channels) { - this.channels.push(new Float32Array(channel)) - } + this.channels = [] + for (const channel of buffer.channels) { + this.channels.push(new Float32Array(channel)) + } - this.capacity = buffer.capacity - } + this.capacity = buffer.capacity + } - // Write samples for single audio frame, returning the total number written. - write(frame: AudioData): number { - const readPos = Atomics.load(this.state, STATE.READ_POS) - const writePos = Atomics.load(this.state, STATE.WRITE_POS) + // Write samples for single audio frame, returning the total number written. + write(frame: AudioData): number { + const readPos = Atomics.load(this.state, STATE.READ_POS) + const writePos = Atomics.load(this.state, STATE.WRITE_POS) - const startPos = writePos - let endPos = writePos + frame.numberOfFrames + const startPos = writePos + let endPos = writePos + frame.numberOfFrames - if (endPos > readPos + this.capacity) { - endPos = readPos + this.capacity - if (endPos <= startPos) { - // No space to write - return 0 - } - } + if (endPos > readPos + this.capacity) { + endPos = readPos + this.capacity + if (endPos <= startPos) { + // No space to write + return 0 + } + } - const startIndex = startPos % this.capacity - const endIndex = endPos % this.capacity + const startIndex = startPos % this.capacity + const endIndex = endPos % this.capacity - // Loop over each channel - for (let i = 0; i < this.channels.length; i += 1) { - const channel = this.channels[i] + // Loop over each channel + for (let i = 0; i < this.channels.length; i += 1) { + const channel = this.channels[i] - if (startIndex < endIndex) { - // One continuous range to copy. - const full = channel.subarray(startIndex, endIndex) + if (startIndex < endIndex) { + // One continuous range to copy. + const full = channel.subarray(startIndex, endIndex) - frame.copyTo(full, { - planeIndex: i, - frameCount: endIndex - startIndex, - }) - } else { - const first = channel.subarray(startIndex) - const second = channel.subarray(0, endIndex) + frame.copyTo(full, { + planeIndex: i, + frameCount: endIndex - startIndex, + }) + } else { + const first = channel.subarray(startIndex) + const second = channel.subarray(0, endIndex) - frame.copyTo(first, { - planeIndex: i, - frameCount: first.length, - }) + frame.copyTo(first, { + planeIndex: i, + frameCount: first.length, + }) - // We need this conditional when startIndex == 0 and endIndex == 0 - // When capacity=4410 and frameCount=1024, this was happening 52s into the audio. - if (second.length) { - frame.copyTo(second, { - planeIndex: i, - frameOffset: first.length, - frameCount: second.length, - }) - } - } - } + // We need this conditional when startIndex == 0 and endIndex == 0 + // When capacity=4410 and frameCount=1024, this was happening 52s into the audio. + if (second.length) { + frame.copyTo(second, { + planeIndex: i, + frameOffset: first.length, + frameCount: second.length, + }) + } + } + } - Atomics.store(this.state, STATE.WRITE_POS, endPos) + Atomics.store(this.state, STATE.WRITE_POS, endPos) - return endPos - startPos - } + return endPos - startPos + } - read(dst: Float32Array[]): number { - const readPos = Atomics.load(this.state, STATE.READ_POS) - const writePos = Atomics.load(this.state, STATE.WRITE_POS) + read(dst: Float32Array[]): number { + const readPos = Atomics.load(this.state, STATE.READ_POS) + const writePos = Atomics.load(this.state, STATE.WRITE_POS) - const startPos = readPos - let endPos = startPos + dst[0].length + const startPos = readPos + let endPos = startPos + dst[0].length - if (endPos > writePos) { - endPos = writePos - if (endPos <= startPos) { - // Nothing to read - return 0 - } - } + if (endPos > writePos) { + endPos = writePos + if (endPos <= startPos) { + // Nothing to read + return 0 + } + } - const startIndex = startPos % this.capacity - const endIndex = endPos % this.capacity + const startIndex = startPos % this.capacity + const endIndex = endPos % this.capacity - // Loop over each channel - for (let i = 0; i < dst.length; i += 1) { - if (i >= this.channels.length) { - // ignore excess channels - } + // Loop over each channel + for (let i = 0; i < dst.length; i += 1) { + if (i >= this.channels.length) { + // ignore excess channels + } - const input = this.channels[i] - const output = dst[i] + const input = this.channels[i] + const output = dst[i] - if (startIndex < endIndex) { - const full = input.subarray(startIndex, endIndex) - output.set(full) - } else { - const first = input.subarray(startIndex) - const second = input.subarray(0, endIndex) + if (startIndex < endIndex) { + const full = input.subarray(startIndex, endIndex) + output.set(full) + } else { + const first = input.subarray(startIndex) + const second = input.subarray(0, endIndex) - output.set(first) - output.set(second, first.length) - } - } + output.set(first) + output.set(second, first.length) + } + } - Atomics.store(this.state, STATE.READ_POS, endPos) + Atomics.store(this.state, STATE.READ_POS, endPos) - return endPos - startPos - } + return endPos - startPos + } - size() { - // TODO is this thread safe? - const readPos = Atomics.load(this.state, STATE.READ_POS) - const writePos = Atomics.load(this.state, STATE.WRITE_POS) + size() { + // TODO is this thread safe? + const readPos = Atomics.load(this.state, STATE.READ_POS) + const writePos = Atomics.load(this.state, STATE.WRITE_POS) - return writePos - readPos - } + return writePos - readPos + } } diff --git a/web/src/player/video.ts b/web/src/player/video.ts index b118b3b..8b4a18d 100644 --- a/web/src/player/video.ts +++ b/web/src/player/video.ts @@ -1,101 +1,98 @@ import * as Message from "./message" export default class Video { - canvas: OffscreenCanvas - queue: Array + canvas: OffscreenCanvas + queue: Array - render: number // non-zero if requestAnimationFrame has been called - sync?: number // the wall clock value for timestamp 0, in microseconds - last?: number // the timestamp of the last rendered frame, in microseconds + render: number // non-zero if requestAnimationFrame has been called + sync?: number // the wall clock value for timestamp 0, in microseconds + last?: number // the timestamp of the last rendered frame, in microseconds - constructor(config: Message.Config) { - this.canvas = config.canvas - this.queue = [] + constructor(config: Message.Config) { + this.canvas = config.canvas + this.queue = [] - this.render = 0 - } + this.render = 0 + } - push(frame: VideoFrame) { - // Drop any old frames - if (this.last && frame.timestamp <= this.last) { - frame.close() - return - } + push(frame: VideoFrame) { + // Drop any old frames + if (this.last && frame.timestamp <= this.last) { + frame.close() + return + } - // Insert the frame into the queue sorted by timestamp. - if ( - this.queue.length > 0 && - this.queue[this.queue.length - 1].timestamp <= frame.timestamp - ) { - // Fast path because we normally append to the end. - this.queue.push(frame) - } else { - // Do a full binary search - let low = 0 - let high = this.queue.length + // Insert the frame into the queue sorted by timestamp. + if (this.queue.length > 0 && this.queue[this.queue.length - 1].timestamp <= frame.timestamp) { + // Fast path because we normally append to the end. + this.queue.push(frame) + } else { + // Do a full binary search + let low = 0 + let high = this.queue.length - while (low < high) { - const mid = (low + high) >>> 1 - if (this.queue[mid].timestamp < frame.timestamp) low = mid + 1 - else high = mid - } + while (low < high) { + const mid = (low + high) >>> 1 + if (this.queue[mid].timestamp < frame.timestamp) low = mid + 1 + else high = mid + } - this.queue.splice(low, 0, frame) - } - } + this.queue.splice(low, 0, frame) + } + } - draw(now: number) { - // Draw and then queue up the next draw call. - this.drawOnce(now) + draw(now: number) { + // Draw and then queue up the next draw call. + this.drawOnce(now) - // Queue up the new draw frame. - this.render = self.requestAnimationFrame(this.draw.bind(this)) - } + // Queue up the new draw frame. + this.render = self.requestAnimationFrame(this.draw.bind(this)) + } - drawOnce(now: number) { - // Convert to microseconds - now *= 1000 + drawOnce(now: number) { + // Convert to microseconds + now *= 1000 - if (!this.queue.length) { - return - } + if (!this.queue.length) { + return + } - let frame = this.queue[0] + let frame = this.queue[0] - if (!this.sync) { - this.sync = now - frame.timestamp - } + if (!this.sync) { + this.sync = now - frame.timestamp + } - // Determine the target timestamp. - const target = now - this.sync + // Determine the target timestamp. + const target = now - this.sync - if (frame.timestamp >= target) { - // nothing to render yet, wait for the next animation frame - return - } + if (frame.timestamp >= target) { + // nothing to render yet, wait for the next animation frame + return + } - this.queue.shift() + this.queue.shift() - // Check if we should skip some frames - while (this.queue.length) { - const next = this.queue[0] - if (next.timestamp > target) break + // Check if we should skip some frames + while (this.queue.length) { + const next = this.queue[0] + if (next.timestamp > target) break - frame.close() - frame = this.queue.shift()! - } + frame.close() + frame = this.queue.shift()! + } - const ctx = this.canvas.getContext("2d") - ctx!.drawImage(frame, 0, 0, this.canvas.width, this.canvas.height) // TODO aspect ratio + const ctx = this.canvas.getContext("2d") + ctx!.drawImage(frame, 0, 0, this.canvas.width, this.canvas.height) // TODO aspect ratio - this.last = frame.timestamp - frame.close() - } + this.last = frame.timestamp + frame.close() + } - play(_play: Message.Play) { - // Queue up to render the next frame. - if (!this.render) { - this.render = self.requestAnimationFrame(this.draw.bind(this)) - } - } + play(_play: Message.Play) { + // Queue up to render the next frame. + if (!this.render) { + this.render = self.requestAnimationFrame(this.draw.bind(this)) + } + } } diff --git a/web/src/player/worker.ts b/web/src/player/worker.ts index 064889d..f30dd22 100644 --- a/web/src/player/worker.ts +++ b/web/src/player/worker.ts @@ -6,19 +6,19 @@ let decoder: Decoder let renderer: Renderer self.addEventListener("message", async (e: MessageEvent) => { - if (e.data.config) { - const config = e.data.config as Message.Config + if (e.data.config) { + const config = e.data.config as Message.Config - renderer = new Renderer(config) - decoder = new Decoder(renderer) - } else if (e.data.init) { - const init = e.data.init as Message.Init - await decoder.receiveInit(init) - } else if (e.data.segment) { - const segment = e.data.segment as Message.Segment - await decoder.receiveSegment(segment) - } else if (e.data.play) { - const play = e.data.play as Message.Play - await renderer.play(play) - } + renderer = new Renderer(config) + decoder = new Decoder(renderer) + } else if (e.data.init) { + const init = e.data.init as Message.Init + await decoder.receiveInit(init) + } else if (e.data.segment) { + const segment = e.data.segment as Message.Segment + await decoder.receiveSegment(segment) + } else if (e.data.play) { + const play = e.data.play as Message.Play + await renderer.play(play) + } }) diff --git a/web/src/player/worklet.ts b/web/src/player/worklet.ts index 9f4f6d3..77759e9 100644 --- a/web/src/player/worklet.ts +++ b/web/src/player/worklet.ts @@ -7,51 +7,47 @@ import * as Message from "./message" import { Ring } from "./ring" class Renderer extends AudioWorkletProcessor { - ring?: Ring - base: number + ring?: Ring + base: number - constructor(_params: AudioWorkletNodeOptions) { - // The super constructor call is required. - super() + constructor(_params: AudioWorkletNodeOptions) { + // The super constructor call is required. + super() - this.base = 0 - this.port.onmessage = this.onMessage.bind(this) - } + this.base = 0 + this.port.onmessage = this.onMessage.bind(this) + } - onMessage(e: MessageEvent) { - if (e.data.play) { - this.onPlay(e.data.play) - } - } + onMessage(e: MessageEvent) { + if (e.data.play) { + this.onPlay(e.data.play) + } + } - onPlay(play: Message.Play) { - this.ring = new Ring(play.buffer) - } + onPlay(play: Message.Play) { + this.ring = new Ring(play.buffer) + } - // Inputs and outputs in groups of 128 samples. - process( - inputs: Float32Array[][], - outputs: Float32Array[][], - _parameters: Record - ): boolean { - if (!this.ring) { - // Paused - return true - } + // Inputs and outputs in groups of 128 samples. + process(inputs: Float32Array[][], outputs: Float32Array[][], _parameters: Record): boolean { + if (!this.ring) { + // Paused + return true + } - if (inputs.length != 1 && outputs.length != 1) { - throw new Error("only a single track is supported") - } + if (inputs.length != 1 && outputs.length != 1) { + throw new Error("only a single track is supported") + } - const output = outputs[0] + const output = outputs[0] - const size = this.ring.read(output) - if (size < output.length) { - // TODO trigger rebuffering event - } + const size = this.ring.read(output) + if (size < output.length) { + // TODO trigger rebuffering event + } - return true - } + return true + } } registerProcessor("renderer", Renderer) diff --git a/web/src/stream/reader.ts b/web/src/stream/reader.ts index f2bc0e0..06adf2a 100644 --- a/web/src/stream/reader.ts +++ b/web/src/stream/reader.ts @@ -1,210 +1,195 @@ // Reader wraps a stream and provides convience methods for reading pieces from a stream export default class Reader { - reader: ReadableStream - buffer: Uint8Array + reader: ReadableStream + buffer: Uint8Array - constructor(reader: ReadableStream, buffer: Uint8Array = new Uint8Array(0)) { - this.reader = reader - this.buffer = buffer - } + constructor(reader: ReadableStream, buffer: Uint8Array = new Uint8Array(0)) { + this.reader = reader + this.buffer = buffer + } - // Returns any number of bytes - async read(): Promise { - if (this.buffer.byteLength) { - const buffer = this.buffer - this.buffer = new Uint8Array() - return buffer - } + // Returns any number of bytes + async read(): Promise { + if (this.buffer.byteLength) { + const buffer = this.buffer + this.buffer = new Uint8Array() + return buffer + } - const r = this.reader.getReader() - const result = await r.read() + const r = this.reader.getReader() + const result = await r.read() - r.releaseLock() + r.releaseLock() - return result.value - } + return result.value + } - async readAll(): Promise { - const r = this.reader.getReader() + async readAll(): Promise { + const r = this.reader.getReader() - for (;;) { - const result = await r.read() - if (result.done) { - break - } + for (;;) { + const result = await r.read() + if (result.done) { + break + } - const buffer = new Uint8Array(result.value) + const buffer = new Uint8Array(result.value) - if (this.buffer.byteLength == 0) { - this.buffer = buffer - } else { - const temp = new Uint8Array(this.buffer.byteLength + buffer.byteLength) - temp.set(this.buffer) - temp.set(buffer, this.buffer.byteLength) - this.buffer = temp - } - } + if (this.buffer.byteLength == 0) { + this.buffer = buffer + } else { + const temp = new Uint8Array(this.buffer.byteLength + buffer.byteLength) + temp.set(this.buffer) + temp.set(buffer, this.buffer.byteLength) + this.buffer = temp + } + } - const result = this.buffer - this.buffer = new Uint8Array() + const result = this.buffer + this.buffer = new Uint8Array() - r.releaseLock() + r.releaseLock() - return result - } + return result + } - async bytes(size: number): Promise { - const r = this.reader.getReader() + async bytes(size: number): Promise { + const r = this.reader.getReader() - while (this.buffer.byteLength < size) { - const result = await r.read() - if (result.done) { - throw "short buffer" - } + while (this.buffer.byteLength < size) { + const result = await r.read() + if (result.done) { + throw "short buffer" + } - const buffer = new Uint8Array(result.value) + const buffer = new Uint8Array(result.value) - if (this.buffer.byteLength == 0) { - this.buffer = buffer - } else { - const temp = new Uint8Array(this.buffer.byteLength + buffer.byteLength) - temp.set(this.buffer) - temp.set(buffer, this.buffer.byteLength) - this.buffer = temp - } - } + if (this.buffer.byteLength == 0) { + this.buffer = buffer + } else { + const temp = new Uint8Array(this.buffer.byteLength + buffer.byteLength) + temp.set(this.buffer) + temp.set(buffer, this.buffer.byteLength) + this.buffer = temp + } + } - const result = new Uint8Array( - this.buffer.buffer, - this.buffer.byteOffset, - size - ) - this.buffer = new Uint8Array( - this.buffer.buffer, - this.buffer.byteOffset + size - ) + const result = new Uint8Array(this.buffer.buffer, this.buffer.byteOffset, size) + this.buffer = new Uint8Array(this.buffer.buffer, this.buffer.byteOffset + size) - r.releaseLock() + r.releaseLock() - return result - } + return result + } - async peek(size: number): Promise { - const r = this.reader.getReader() + async peek(size: number): Promise { + const r = this.reader.getReader() - while (this.buffer.byteLength < size) { - const result = await r.read() - if (result.done) { - throw "short buffer" - } + while (this.buffer.byteLength < size) { + const result = await r.read() + if (result.done) { + throw "short buffer" + } - const buffer = new Uint8Array(result.value) + const buffer = new Uint8Array(result.value) - if (this.buffer.byteLength == 0) { - this.buffer = buffer - } else { - const temp = new Uint8Array(this.buffer.byteLength + buffer.byteLength) - temp.set(this.buffer) - temp.set(buffer, this.buffer.byteLength) - this.buffer = temp - } - } + if (this.buffer.byteLength == 0) { + this.buffer = buffer + } else { + const temp = new Uint8Array(this.buffer.byteLength + buffer.byteLength) + temp.set(this.buffer) + temp.set(buffer, this.buffer.byteLength) + this.buffer = temp + } + } - const result = new Uint8Array( - this.buffer.buffer, - this.buffer.byteOffset, - size - ) + const result = new Uint8Array(this.buffer.buffer, this.buffer.byteOffset, size) - r.releaseLock() + r.releaseLock() - return result - } + return result + } - async view(size: number): Promise { - const buf = await this.bytes(size) - return new DataView(buf.buffer, buf.byteOffset, buf.byteLength) - } + async view(size: number): Promise { + const buf = await this.bytes(size) + return new DataView(buf.buffer, buf.byteOffset, buf.byteLength) + } - async uint8(): Promise { - const view = await this.view(1) - return view.getUint8(0) - } + async uint8(): Promise { + const view = await this.view(1) + return view.getUint8(0) + } - async uint16(): Promise { - const view = await this.view(2) - return view.getUint16(0) - } + async uint16(): Promise { + const view = await this.view(2) + return view.getUint16(0) + } - async uint32(): Promise { - const view = await this.view(4) - return view.getUint32(0) - } + async uint32(): Promise { + const view = await this.view(4) + return view.getUint32(0) + } - // Returns a Number using 52-bits, the max Javascript can use for integer math - async uint52(): Promise { - const v = await this.uint64() - if (v > Number.MAX_SAFE_INTEGER) { - throw "overflow" - } + // Returns a Number using 52-bits, the max Javascript can use for integer math + async uint52(): Promise { + const v = await this.uint64() + if (v > Number.MAX_SAFE_INTEGER) { + throw "overflow" + } - return Number(v) - } + return Number(v) + } - // Returns a Number using 52-bits, the max Javascript can use for integer math - async vint52(): Promise { - const v = await this.vint64() - if (v > Number.MAX_SAFE_INTEGER) { - throw "overflow" - } + // Returns a Number using 52-bits, the max Javascript can use for integer math + async vint52(): Promise { + const v = await this.vint64() + if (v > Number.MAX_SAFE_INTEGER) { + throw "overflow" + } - return Number(v) - } + return Number(v) + } - // NOTE: Returns a BigInt instead of a Number - async uint64(): Promise { - const view = await this.view(8) - return view.getBigUint64(0) - } + // NOTE: Returns a BigInt instead of a Number + async uint64(): Promise { + const view = await this.view(8) + return view.getBigUint64(0) + } - // NOTE: Returns a BigInt instead of a Number - async vint64(): Promise { - const peek = await this.peek(1) - const first = new DataView( - peek.buffer, - peek.byteOffset, - peek.byteLength - ).getUint8(0) - const size = (first & 0xc0) >> 6 + // NOTE: Returns a BigInt instead of a Number + async vint64(): Promise { + const peek = await this.peek(1) + const first = new DataView(peek.buffer, peek.byteOffset, peek.byteLength).getUint8(0) + const size = (first & 0xc0) >> 6 - switch (size) { - case 0: { - const v = await this.uint8() - return BigInt(v) & 0x3fn - } - case 1: { - const v = await this.uint16() - return BigInt(v) & 0x3fffn - } - case 2: { - const v = await this.uint32() - return BigInt(v) & 0x3fffffffn - } - case 3: { - const v = await this.uint64() - return v & 0x3fffffffffffffffn - } - default: - throw "impossible" - } - } + switch (size) { + case 0: { + const v = await this.uint8() + return BigInt(v) & 0x3fn + } + case 1: { + const v = await this.uint16() + return BigInt(v) & 0x3fffn + } + case 2: { + const v = await this.uint32() + return BigInt(v) & 0x3fffffffn + } + case 3: { + const v = await this.uint64() + return v & 0x3fffffffffffffffn + } + default: + throw "impossible" + } + } - async done(): Promise { - try { - await this.peek(1) - return false - } catch (err) { - return true // Assume EOF - } - } + async done(): Promise { + try { + await this.peek(1) + return false + } catch (err) { + return true // Assume EOF + } + } } diff --git a/web/src/stream/writer.ts b/web/src/stream/writer.ts index 21b3054..0210051 100644 --- a/web/src/stream/writer.ts +++ b/web/src/stream/writer.ts @@ -1,100 +1,100 @@ // Writer wraps a stream and writes chunks of data export default class Writer { - buffer: ArrayBuffer - writer: WritableStreamDefaultWriter + buffer: ArrayBuffer + writer: WritableStreamDefaultWriter - constructor(stream: WritableStream) { - this.buffer = new ArrayBuffer(8) - this.writer = stream.getWriter() - } + constructor(stream: WritableStream) { + this.buffer = new ArrayBuffer(8) + this.writer = stream.getWriter() + } - release() { - this.writer.releaseLock() - } + release() { + this.writer.releaseLock() + } - async close() { - return this.writer.close() - } + async close() { + return this.writer.close() + } - async uint8(v: number) { - const view = new DataView(this.buffer, 0, 1) - view.setUint8(0, v) - return this.writer.write(view) - } + async uint8(v: number) { + const view = new DataView(this.buffer, 0, 1) + view.setUint8(0, v) + return this.writer.write(view) + } - async uint16(v: number) { - const view = new DataView(this.buffer, 0, 2) - view.setUint16(0, v) - return this.writer.write(view) - } + async uint16(v: number) { + const view = new DataView(this.buffer, 0, 2) + view.setUint16(0, v) + return this.writer.write(view) + } - async uint24(v: number) { - const v1 = (v >> 16) & 0xff - const v2 = (v >> 8) & 0xff - const v3 = v & 0xff + async uint24(v: number) { + const v1 = (v >> 16) & 0xff + const v2 = (v >> 8) & 0xff + const v3 = v & 0xff - const view = new DataView(this.buffer, 0, 3) - view.setUint8(0, v1) - view.setUint8(1, v2) - view.setUint8(2, v3) + const view = new DataView(this.buffer, 0, 3) + view.setUint8(0, v1) + view.setUint8(1, v2) + view.setUint8(2, v3) - return this.writer.write(view) - } + return this.writer.write(view) + } - async uint32(v: number) { - const view = new DataView(this.buffer, 0, 4) - view.setUint32(0, v) - return this.writer.write(view) - } + async uint32(v: number) { + const view = new DataView(this.buffer, 0, 4) + view.setUint32(0, v) + return this.writer.write(view) + } - async uint52(v: number) { - if (v > Number.MAX_SAFE_INTEGER) { - throw "value too large" - } + async uint52(v: number) { + if (v > Number.MAX_SAFE_INTEGER) { + throw "value too large" + } - this.uint64(BigInt(v)) - } + this.uint64(BigInt(v)) + } - async vint52(v: number) { - if (v > Number.MAX_SAFE_INTEGER) { - throw "value too large" - } + async vint52(v: number) { + if (v > Number.MAX_SAFE_INTEGER) { + throw "value too large" + } - if (v < 1 << 6) { - return this.uint8(v) - } else if (v < 1 << 14) { - return this.uint16(v | 0x4000) - } else if (v < 1 << 30) { - return this.uint32(v | 0x80000000) - } else { - return this.uint64(BigInt(v) | 0xc000000000000000n) - } - } + if (v < 1 << 6) { + return this.uint8(v) + } else if (v < 1 << 14) { + return this.uint16(v | 0x4000) + } else if (v < 1 << 30) { + return this.uint32(v | 0x80000000) + } else { + return this.uint64(BigInt(v) | 0xc000000000000000n) + } + } - async uint64(v: bigint) { - const view = new DataView(this.buffer, 0, 8) - view.setBigUint64(0, v) - return this.writer.write(view) - } + async uint64(v: bigint) { + const view = new DataView(this.buffer, 0, 8) + view.setBigUint64(0, v) + return this.writer.write(view) + } - async vint64(v: bigint) { - if (v < 1 << 6) { - return this.uint8(Number(v)) - } else if (v < 1 << 14) { - return this.uint16(Number(v) | 0x4000) - } else if (v < 1 << 30) { - return this.uint32(Number(v) | 0x80000000) - } else { - return this.uint64(v | 0xc000000000000000n) - } - } + async vint64(v: bigint) { + if (v < 1 << 6) { + return this.uint8(Number(v)) + } else if (v < 1 << 14) { + return this.uint16(Number(v) | 0x4000) + } else if (v < 1 << 30) { + return this.uint32(Number(v) | 0x80000000) + } else { + return this.uint64(v | 0xc000000000000000n) + } + } - async bytes(buffer: ArrayBuffer) { - return this.writer.write(buffer) - } + async bytes(buffer: ArrayBuffer) { + return this.writer.write(buffer) + } - async string(str: string) { - const data = new TextEncoder().encode(str) - return this.writer.write(data) - } + async string(str: string) { + const data = new TextEncoder().encode(str) + return this.writer.write(data) + } } diff --git a/web/src/transport/index.ts b/web/src/transport/index.ts index 13d16de..d2e4e55 100644 --- a/web/src/transport/index.ts +++ b/web/src/transport/index.ts @@ -2,95 +2,95 @@ import * as Stream from "../stream" import * as Interface from "./interface" export interface Config { - url: string - fingerprint?: WebTransportHash // the certificate fingerprint, temporarily needed for local development + url: string + fingerprint?: WebTransportHash // the certificate fingerprint, temporarily needed for local development } export default class Transport { - quic: Promise - api: Promise - callback?: Interface.Callback + quic: Promise + api: Promise + callback?: Interface.Callback - constructor(config: Config) { - this.quic = this.connect(config) + constructor(config: Config) { + this.quic = this.connect(config) - // Create a unidirectional stream for all of our messages - this.api = this.quic.then((q) => { - return q.createUnidirectionalStream() - }) + // Create a unidirectional stream for all of our messages + this.api = this.quic.then((q) => { + return q.createUnidirectionalStream() + }) - // async functions - this.receiveStreams() - } + // async functions + this.receiveStreams() + } - async close() { - ;(await this.quic).close() - } + async close() { + ;(await this.quic).close() + } - // Helper function to make creating a promise easier - private async connect(config: Config): Promise { - const options: WebTransportOptions = {} - if (config.fingerprint) { - options.serverCertificateHashes = [config.fingerprint] - } + // Helper function to make creating a promise easier + private async connect(config: Config): Promise { + const options: WebTransportOptions = {} + if (config.fingerprint) { + options.serverCertificateHashes = [config.fingerprint] + } - const quic = new WebTransport(config.url, options) - await quic.ready - return quic - } + const quic = new WebTransport(config.url, options) + await quic.ready + return quic + } - async sendMessage(msg: any) { - const payload = JSON.stringify(msg) - const size = payload.length + 8 + async sendMessage(msg: any) { + const payload = JSON.stringify(msg) + const size = payload.length + 8 - const stream = await this.api + const stream = await this.api - const writer = new Stream.Writer(stream) - await writer.uint32(size) - await writer.string("warp") - await writer.string(payload) - writer.release() - } + const writer = new Stream.Writer(stream) + await writer.uint32(size) + await writer.string("warp") + await writer.string(payload) + writer.release() + } - async receiveStreams() { - const q = await this.quic - const streams = q.incomingUnidirectionalStreams.getReader() + async receiveStreams() { + const q = await this.quic + const streams = q.incomingUnidirectionalStreams.getReader() - for (;;) { - const result = await streams.read() - if (result.done) break + for (;;) { + const result = await streams.read() + if (result.done) break - const stream = result.value - this.handleStream(stream) // don't await - } - } + const stream = result.value + this.handleStream(stream) // don't await + } + } - async handleStream(stream: ReadableStream) { - const r = new Stream.Reader(stream) + async handleStream(stream: ReadableStream) { + const r = new Stream.Reader(stream) - while (!(await r.done())) { - const size = await r.uint32() - const typ = new TextDecoder("utf-8").decode(await r.bytes(4)) + while (!(await r.done())) { + const size = await r.uint32() + const typ = new TextDecoder("utf-8").decode(await r.bytes(4)) - if (typ != "warp") throw "expected warp atom" - if (size < 8) throw "atom too small" + if (typ != "warp") throw "expected warp atom" + if (size < 8) throw "atom too small" - const payload = new TextDecoder("utf-8").decode(await r.bytes(size - 8)) - const msg = JSON.parse(payload) + const payload = new TextDecoder("utf-8").decode(await r.bytes(size - 8)) + const msg = JSON.parse(payload) - if (msg.init) { - return this.callback?.onInit({ - buffer: r.buffer, - reader: r.reader, - }) - } else if (msg.segment) { - return this.callback?.onSegment({ - buffer: r.buffer, - reader: r.reader, - }) - } else { - console.warn("unknown message", msg) - } - } - } + if (msg.init) { + return this.callback?.onInit({ + buffer: r.buffer, + reader: r.reader, + }) + } else if (msg.segment) { + return this.callback?.onSegment({ + buffer: r.buffer, + reader: r.reader, + }) + } else { + console.warn("unknown message", msg) + } + } + } } diff --git a/web/src/transport/interface.ts b/web/src/transport/interface.ts index 5626c3c..84a4276 100644 --- a/web/src/transport/interface.ts +++ b/web/src/transport/interface.ts @@ -1,14 +1,14 @@ export interface Callback { - onInit(init: Init): any - onSegment(segment: Segment): any + onInit(init: Init): any + onSegment(segment: Segment): any } export interface Init { - buffer: Uint8Array // unread buffered data - reader: ReadableStream // unread unbuffered data + buffer: Uint8Array // unread buffered data + reader: ReadableStream // unread unbuffered data } export interface Segment { - buffer: Uint8Array // unread buffered data - reader: ReadableStream // unread unbuffered data + buffer: Uint8Array // unread buffered data + reader: ReadableStream // unread unbuffered data } diff --git a/web/src/transport/message.ts b/web/src/transport/message.ts index c120058..6dee40c 100644 --- a/web/src/transport/message.ts +++ b/web/src/transport/message.ts @@ -3,5 +3,5 @@ export type Init = any export type Segment = any export interface Debug { - max_bitrate: number + max_bitrate: number } diff --git a/web/src/transport/webtransport.d.ts b/web/src/transport/webtransport.d.ts index 34394ca..7ac6b8d 100644 --- a/web/src/transport/webtransport.d.ts +++ b/web/src/transport/webtransport.d.ts @@ -8,77 +8,77 @@ declare module "webtransport" */ interface WebTransportDatagramDuplexStream { - readonly readable: ReadableStream - readonly writable: WritableStream - readonly maxDatagramSize: number - incomingMaxAge: number - outgoingMaxAge: number - incomingHighWaterMark: number - outgoingHighWaterMark: number + readonly readable: ReadableStream + readonly writable: WritableStream + readonly maxDatagramSize: number + incomingMaxAge: number + outgoingMaxAge: number + incomingHighWaterMark: number + outgoingHighWaterMark: number } interface WebTransport { - getStats(): Promise - readonly ready: Promise - readonly closed: Promise - close(closeInfo?: WebTransportCloseInfo): undefined - readonly datagrams: WebTransportDatagramDuplexStream - createBidirectionalStream(): Promise - readonly incomingBidirectionalStreams: ReadableStream - createUnidirectionalStream(): Promise - readonly incomingUnidirectionalStreams: ReadableStream + getStats(): Promise + readonly ready: Promise + readonly closed: Promise + close(closeInfo?: WebTransportCloseInfo): undefined + readonly datagrams: WebTransportDatagramDuplexStream + createBidirectionalStream(): Promise + readonly incomingBidirectionalStreams: ReadableStream + createUnidirectionalStream(): Promise + readonly incomingUnidirectionalStreams: ReadableStream } declare const WebTransport: { - prototype: WebTransport - new (url: string, options?: WebTransportOptions): WebTransport + prototype: WebTransport + new (url: string, options?: WebTransportOptions): WebTransport } interface WebTransportHash { - algorithm?: string - value?: BufferSource + algorithm?: string + value?: BufferSource } interface WebTransportOptions { - allowPooling?: boolean - serverCertificateHashes?: Array + allowPooling?: boolean + serverCertificateHashes?: Array } interface WebTransportCloseInfo { - closeCode?: number - reason?: string + closeCode?: number + reason?: string } interface WebTransportStats { - timestamp?: DOMHighResTimeStamp - bytesSent?: number - packetsSent?: number - numOutgoingStreamsCreated?: number - numIncomingStreamsCreated?: number - bytesReceived?: number - packetsReceived?: number - minRtt?: DOMHighResTimeStamp - numReceivedDatagramsDropped?: number + timestamp?: DOMHighResTimeStamp + bytesSent?: number + packetsSent?: number + numOutgoingStreamsCreated?: number + numIncomingStreamsCreated?: number + bytesReceived?: number + packetsReceived?: number + minRtt?: DOMHighResTimeStamp + numReceivedDatagramsDropped?: number } interface WebTransportBidirectionalStream { - readonly readable: ReadableStream - readonly writable: WritableStream + readonly readable: ReadableStream + readonly writable: WritableStream } interface WebTransportError extends DOMException { - readonly source: WebTransportErrorSource - readonly streamErrorCode: number + readonly source: WebTransportErrorSource + readonly streamErrorCode: number } declare const WebTransportError: { - prototype: WebTransportError - new (init?: WebTransportErrorInit): WebTransportError + prototype: WebTransportError + new (init?: WebTransportErrorInit): WebTransportError } interface WebTransportErrorInit { - streamErrorCode?: number - message?: string + streamErrorCode?: number + message?: string } type WebTransportErrorSource = "stream" | "session" diff --git a/web/src/util/deferred.ts b/web/src/util/deferred.ts index 68de7e9..4ef9ee1 100644 --- a/web/src/util/deferred.ts +++ b/web/src/util/deferred.ts @@ -1,20 +1,20 @@ export default class Deferred { - promise: Promise - resolve: (value: T | PromiseLike) => void - reject: (value: T | PromiseLike) => void + promise: Promise + resolve: (value: T | PromiseLike) => void + reject: (value: T | PromiseLike) => void - constructor() { - // Set initial values so TS stops being annoying. - this.resolve = (_value: T | PromiseLike) => { - /* noop */ - } - this.reject = (_value: T | PromiseLike) => { - /* noop */ - } + constructor() { + // Set initial values so TS stops being annoying. + this.resolve = (_value: T | PromiseLike) => { + /* noop */ + } + this.reject = (_value: T | PromiseLike) => { + /* noop */ + } - this.promise = new Promise((resolve, reject) => { - this.resolve = resolve - this.reject = reject - }) - } + this.promise = new Promise((resolve, reject) => { + this.resolve = resolve + this.reject = reject + }) + } } diff --git a/web/tsconfig.json b/web/tsconfig.json index 7966004..d1a39a8 100644 --- a/web/tsconfig.json +++ b/web/tsconfig.json @@ -1,9 +1,9 @@ { - "include": ["src/**/*"], - "compilerOptions": { - "target": "es2022", - "module": "es2022", - "moduleResolution": "node", - "strict": true - } + "include": ["src/**/*"], + "compilerOptions": { + "target": "es2022", + "module": "es2022", + "moduleResolution": "node", + "strict": true + } } diff --git a/web/yarn.lock b/web/yarn.lock index c1340ea..6abab3f 100644 --- a/web/yarn.lock +++ b/web/yarn.lock @@ -1265,6 +1265,13 @@ eslint-config-prettier@^8.8.0: resolved "https://registry.yarnpkg.com/eslint-config-prettier/-/eslint-config-prettier-8.8.0.tgz#bfda738d412adc917fd7b038857110efe98c9348" integrity sha512-wLbQiFre3tdGgpDv67NQKnJuTlcUVYHas3k+DZCc2U2BadthoEY4B7hLPvAxaqdyOGCzuLfii2fqGph10va7oA== +eslint-plugin-prettier@^4.2.1: + version "4.2.1" + resolved "https://registry.yarnpkg.com/eslint-plugin-prettier/-/eslint-plugin-prettier-4.2.1.tgz#651cbb88b1dab98bfd42f017a12fa6b2d993f94b" + integrity sha512-f/0rXLXUt0oFYs8ra4w49wYZBG5GKZpAYsJSm6rnYL5uVDjd+zowwMwVZHnAjf4edNrKpCDYfXDgmRE/Ak7QyQ== + dependencies: + prettier-linter-helpers "^1.0.0" + eslint-scope@^5.1.1: version "5.1.1" resolved "https://registry.npmjs.org/eslint-scope/-/eslint-scope-5.1.1.tgz" @@ -1374,6 +1381,11 @@ fast-deep-equal@^3.1.1, fast-deep-equal@^3.1.3: resolved "https://registry.npmjs.org/fast-deep-equal/-/fast-deep-equal-3.1.3.tgz" integrity sha512-f3qQ9oQy9j2AhBe/H9VC91wLmKBCCU/gDOnKNAYG5hswO7BLKj09Hc5HYNz9cGI++xlpDCIgDaitVs03ATR84Q== +fast-diff@^1.1.2: + version "1.3.0" + resolved "https://registry.yarnpkg.com/fast-diff/-/fast-diff-1.3.0.tgz#ece407fa550a64d638536cd727e129c61616e0f0" + integrity sha512-VxPP4NqbUjj6MaAOafWeUn2cXWLcCtljklUtZf0Ind4XQ+QPtmA0b18zZy0jIQx+ExRVCR/ZQpBmik5lXshNsw== + fast-glob@^3.2.9: version "3.2.12" resolved "https://registry.npmjs.org/fast-glob/-/fast-glob-3.2.12.tgz" @@ -1989,6 +2001,13 @@ prelude-ls@^1.2.1: resolved "https://registry.npmjs.org/prelude-ls/-/prelude-ls-1.2.1.tgz" integrity sha512-vkcDPrRZo1QZLbn5RLGPpg/WmIQ65qoWWhcGKf/b5eplkkarX0m9z8ppCat4mlOqUsWpyNuYgO3VRyrYHSzX5g== +prettier-linter-helpers@^1.0.0: + version "1.0.0" + resolved "https://registry.yarnpkg.com/prettier-linter-helpers/-/prettier-linter-helpers-1.0.0.tgz#d23d41fe1375646de2d0104d3454a3008802cf7b" + integrity sha512-GbK2cP9nraSSUF9N2XwUwqfzlAFlMNYYl+ShE/V+H8a9uNl/oUqB1w2EL54Jh0OlyRSd8RfWYJ3coVS4TROP2w== + dependencies: + fast-diff "^1.1.2" + prettier@^2.8.8: version "2.8.8" resolved "https://registry.yarnpkg.com/prettier/-/prettier-2.8.8.tgz#e8c5d7e98a4305ffe3de2e1fc4aca1a71c28b1da"