Skip to content

Commit

Permalink
fix: non-deferred interleaved span enter events
Browse files Browse the repository at this point in the history
  • Loading branch information
ten3roberts committed Dec 13, 2023
1 parent fcd9eed commit 09adf54
Show file tree
Hide file tree
Showing 3 changed files with 178 additions and 62 deletions.
101 changes: 101 additions & 0 deletions examples/concurrent_eager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};

use futures::{pin_mut, FutureExt};
use tracing::Instrument;
use tracing_subscriber::{layer::SubscriberExt, registry::Registry};
use tracing_tree::HierarchicalLayer;

fn main() {
let layer = HierarchicalLayer::default()
.with_writer(std::io::stdout)
.with_indent_lines(true)
.with_indent_amount(4)
.with_thread_names(true)
.with_thread_ids(true)
.with_span_retrace(true)
.with_deferred_spans(false)
.with_targets(true);

let subscriber = Registry::default().with(layer);
tracing::subscriber::set_global_default(subscriber).unwrap();
#[cfg(feature = "tracing-log")]
tracing_log::LogTracer::init().unwrap();

let fut_a = spawn_fut("a", a);
pin_mut!(fut_a);

let waker = futures::task::noop_waker();
let mut cx = Context::from_waker(&waker);
assert!(fut_a.poll_unpin(&mut cx).is_pending());

let fut_b = spawn_fut("b", b);
pin_mut!(fut_b);

assert!(fut_b.poll_unpin(&mut cx).is_pending());

assert!(fut_a.poll_unpin(&mut cx).is_pending());
assert!(fut_b.poll_unpin(&mut cx).is_pending());

assert!(fut_a.poll_unpin(&mut cx).is_ready());
assert!(fut_b.poll_unpin(&mut cx).is_ready());
}

fn spawn_fut<F: Fn() -> Fut, Fut: Future<Output = ()>>(
key: &'static str,
inner: F,
) -> impl Future<Output = ()> {
let span = tracing::info_span!("spawn_fut", key);

async move {
countdown(1).await;

inner().await;
}
.instrument(span)
}

fn a() -> impl Future<Output = ()> {
let span = tracing::info_span!("a");

async move {
countdown(1).await;
tracing::info!("a");
}
.instrument(span)
}

fn b() -> impl Future<Output = ()> {
let span = tracing::info_span!("b");

async move {
countdown(1).await;
tracing::info!("b");
}
.instrument(span)
}

fn countdown(count: u32) -> impl Future<Output = ()> {
CountdownFuture { count }
}

struct CountdownFuture {
count: u32,
}

impl Future for CountdownFuture {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.count == 0 {
Poll::Ready(())
} else {
self.count -= 1;
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
16 changes: 16 additions & 0 deletions examples/concurrent_eager.stdout
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
1:main┐concurrent_eager::spawn_fut key="a"
1:main┐concurrent_eager::spawn_fut key="b"
1:main┐concurrent_eager::spawn_fut key="a"
1:main├───┐concurrent_eager::a
1:main┐concurrent_eager::spawn_fut key="b"
1:main├───┐concurrent_eager::b
1:main┐concurrent_eager::spawn_fut key="a"
1:main├───┐concurrent_eager::a
1:main│ ├─── Xms INFO concurrent_eager a
1:main├───┘
1:main┐concurrent_eager::spawn_fut key="b"
1:main├───┐concurrent_eager::b
1:main│ ├─── Xms INFO concurrent_eager b
1:main├───┘
1:main┘
1:main┘
123 changes: 61 additions & 62 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ where
Ok(())
}

/// If `span_retrace` ensures that `new_span` is properly printed before an event
/// Ensures that `new_span` and all its ancestors are properly printed before an event
fn write_retrace_span<'a, S>(
&self,
new_span: &SpanRef<'a, S>,
Expand All @@ -276,61 +276,55 @@ where
) where
S: Subscriber + for<'new_span> LookupSpan<'new_span>,
{
let should_write = if self.config.deferred_spans {
if let Some(data) = new_span.extensions_mut().get_mut::<Data>() {
!data.written
} else {
false
}
} else {
false
};

// Also handle deferred spans along with retrace since deferred spans may need to print
// multiple spans at once as a whole tree can be deferred
if self.config.span_retrace || should_write {
let old_span_id = bufs.current_span.replace((new_span.id()).clone());
let old_span_id = old_span_id.as_ref();

if Some(&new_span.id()) != old_span_id {
let old_span = old_span_id.as_ref().and_then(|v| ctx.span(v));
let old_path = old_span.as_ref().map(scope_path).into_iter().flatten();

let new_path = scope_path(new_span);

// Print the path from the common base of the two spans
let new_path = DifferenceIter::new(old_path, new_path, |v| v.id());

for (i, span) in new_path.enumerate() {
// Mark traversed spans as *written*
let was_written = if let Some(data) = span.extensions_mut().get_mut::<Data>() {
mem::replace(&mut data.written, true)
} else {
// `on_new_span` was not called, before
// Consider if this should panic instead, which is *technically* correct but is
// bad behavior for a logging layer in production.
false
};

// Print the previous span before entering a new deferred or retraced span
if i == 0 && self.config.verbose_entry {
if let Some(parent) = &span.parent() {
self.write_span_info(parent, bufs, SpanMode::PreOpen);
}
//
// If a another event occurs right after a previous event in the same span, this will
// simply print nothing since the path to the common lowest ancestor is empty
// if self.config.span_retrace || self.config.deferred_spans {
let old_span_id = bufs.current_span.replace((new_span.id()).clone());
let old_span_id = old_span_id.as_ref();
let new_span_id = new_span.id();

if Some(&new_span_id) != old_span_id {
let old_span = old_span_id.as_ref().and_then(|v| ctx.span(v));
let old_path = old_span.as_ref().map(scope_path).into_iter().flatten();

let new_path = scope_path(new_span);

// Print the path from the common base of the two spans
let new_path = DifferenceIter::new(old_path, new_path, |v| v.id());

for (i, span) in new_path.enumerate() {
// Mark traversed spans as *written*
let was_written = if let Some(data) = span.extensions_mut().get_mut::<Data>() {
mem::replace(&mut data.written, true)
} else {
// `on_new_span` was not called, before
// Consider if this should panic instead, which is *technically* correct but is
// bad behavior for a logging layer in production.
false
};

// Print the previous span before entering a new deferred or retraced span
if i == 0 && self.config.verbose_entry {
if let Some(parent) = &span.parent() {
self.write_span_info(parent, bufs, SpanMode::PreOpen);
}
let verbose = self.config.verbose_entry && i == 0;

self.write_span_info(
&span,
bufs,
if was_written {
SpanMode::Retrace { verbose }
} else {
SpanMode::Open { verbose }
},
)
}
let verbose = self.config.verbose_entry && i == 0;

self.write_span_info(
&span,
bufs,
if was_written {
SpanMode::Retrace { verbose }
} else {
SpanMode::Open { verbose }
},
)
}
// }
}
}

Expand Down Expand Up @@ -491,22 +485,25 @@ where

let bufs = &mut *self.bufs.lock().unwrap();

// Store the most recently entered span
bufs.current_span = Some(span.id());

if self.config.verbose_entry {
if let Some(span) = span.parent() {
self.write_span_info(&span, bufs, SpanMode::PreOpen);
}
}

self.write_span_info(
&span,
bufs,
SpanMode::Open {
verbose: self.config.verbose_entry,
},
);
if self.config.span_retrace {
self.write_retrace_span(&span, bufs, &ctx);
} else {
// Store the most recently entered span
bufs.current_span = Some(span.id());
self.write_span_info(
&span,
bufs,
SpanMode::Open {
verbose: self.config.verbose_entry,
},
);
}
}

fn on_event(&self, event: &Event<'_>, ctx: Context<S>) {
Expand All @@ -518,7 +515,9 @@ where
let bufs = &mut *guard;

if let Some(new_span) = &span {
self.write_retrace_span(new_span, bufs, &ctx);
if self.config.span_retrace || self.config.deferred_spans {
self.write_retrace_span(new_span, bufs, &ctx);
}
}

let mut event_buf = &mut bufs.current_buf;
Expand Down

0 comments on commit 09adf54

Please sign in to comment.