Small fixes.

This commit is contained in:
Luke Curley 2023-07-08 09:13:09 -07:00
parent 7c3eae0a7a
commit cf2a80b323
3 changed files with 18 additions and 10 deletions

View File

@ -94,7 +94,7 @@ impl Session {
let segment = segment::Info { let segment = segment::Info {
sequence: object.sequence, sequence: object.sequence,
send_order: object.send_order, send_order: object.send_order,
expires: Some(time::Instant::now() + time::Duration::from_secs(10)), expires: Some(time::Instant::now() + time::Duration::from_secs(2)), // TODO increase this once send_order is implemented
}; };
let segment = segment::Publisher::new(segment); let segment = segment::Publisher::new(segment);
@ -162,7 +162,7 @@ impl Session {
fn receive_subscribe_error(&mut self, msg: SubscribeError) -> anyhow::Result<()> { fn receive_subscribe_error(&mut self, msg: SubscribeError) -> anyhow::Result<()> {
let error = track::Error { let error = track::Error {
code: msg.code, code: msg.code,
reason: format!("upstream error: {}", msg.reason), reason: msg.reason,
}; };
// Stop producing the track. // Stop producing the track.
@ -284,13 +284,16 @@ impl Publishers {
pub async fn incoming(&mut self) -> anyhow::Result<Subscribe> { pub async fn incoming(&mut self) -> anyhow::Result<Subscribe> {
let (namespace, track) = self.receiver.recv().await.context("no more subscriptions")?; let (namespace, track) = self.receiver.recv().await.context("no more subscriptions")?;
let id = VarInt::try_from(self.next)?;
self.next += 1;
let msg = Subscribe { let msg = Subscribe {
track_id: VarInt::try_from(self.next)?, track_id: id,
track_namespace: namespace, track_namespace: namespace,
track_name: track.name, track_name: track.name.clone(),
}; };
self.next += 1; self.tracks.insert(id, Some(track));
Ok(msg) Ok(msg)
} }

View File

@ -30,11 +30,16 @@ impl Session {
.find(|v| **v == Version::DRAFT_00) .find(|v| **v == Version::DRAFT_00)
.context("failed to find supported version")?; .context("failed to find supported version")?;
// TODO use the role to decide if we can publish or subscribe // Choose our role based on the client's role.
let role = match session.setup().role {
Role::Publisher => Role::Subscriber,
Role::Subscriber => Role::Publisher,
Role::Both => Role::Both,
};
let setup = SetupServer { let setup = SetupServer {
version: Version::DRAFT_00, version: Version::DRAFT_00,
role: Role::Publisher, role,
}; };
let session = session.accept(setup).await?; let session = session.accept(setup).await?;

View File

@ -54,7 +54,7 @@ impl File {
// Create the catalog track // Create the catalog track
let (_catalog, subscriber) = Self::create_catalog(init); let (_catalog, subscriber) = Self::create_catalog(init);
source.insert("catalog".to_string(), subscriber); source.insert("0".to_string(), subscriber);
let mut tracks = HashMap::new(); let mut tracks = HashMap::new();
@ -83,7 +83,7 @@ impl File {
fn create_catalog(raw: Vec<u8>) -> (track::Publisher, track::Subscriber) { fn create_catalog(raw: Vec<u8>) -> (track::Publisher, track::Subscriber) {
// Create a track with a single segment containing the init data. // Create a track with a single segment containing the init data.
let mut catalog = track::Publisher::new("catalog"); let mut catalog = track::Publisher::new("0");
// Subscribe to the catalog before we push the segment. // Subscribe to the catalog before we push the segment.
let subscriber = catalog.subscribe(); let subscriber = catalog.subscribe();
@ -213,7 +213,7 @@ impl Track {
.unwrap(); .unwrap();
// Delete segments after 10s. // Delete segments after 10s.
let expires = Some(now + time::Duration::from_secs(10)); let expires = Some(now + time::Duration::from_secs(2)); // TODO increase this once send order is implemented
let sequence = self.sequence.try_into().context("sequence too large")?; let sequence = self.sequence.try_into().context("sequence too large")?;
self.sequence += 1; self.sequence += 1;