Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync with dev #147

Merged
merged 6 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
851 changes: 376 additions & 475 deletions Cargo.lock

Large diffs are not rendered by default.

20 changes: 10 additions & 10 deletions casper-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ form_urlencoded = "1"
futures = "0.3"
futures-util = "0.3"
hex = "0.4.3"
http = "0.2" # used by opentelemetry-http
http = "1.1"
itertools = "0.12"
linked-hash-map = "0.5.4"
log = "0.4"
Expand All @@ -32,12 +32,12 @@ num_cpus = "1.13"
num_threads = "0.1"
once_cell = "1"
openssl = "0.10"
opentelemetry = { version = "0.23", features = ["metrics"] }
opentelemetry-http = "0.12"
opentelemetry-prometheus = "0.16"
opentelemetry-semantic-conventions = "0.15"
opentelemetry-zipkin = { version = "0.21", default-features = false }
opentelemetry_sdk = { version = "0.23", features = ["rt-tokio-current-thread"] }
opentelemetry = { version = "0.24", features = ["metrics"] }
opentelemetry-http = "0.13"
opentelemetry-prometheus = "0.17"
opentelemetry-semantic-conventions = "0.16"
opentelemetry-zipkin = { version = "0.22", default-features = false }
opentelemetry_sdk = { version = "0.24", features = ["rt-tokio-current-thread"] }
ouroboros = "0.18"
parking_lot = "0.12"
percent-encoding = "2.2"
Expand Down Expand Up @@ -66,11 +66,11 @@ tokio-stream = { version = "0.1", features = ["time"] }

[dependencies.mlua]
features = ["luau-jit", "async", "serialize", "macros", "unstable"]
version = "0.9.8"
version = "0.9.9"

[dependencies.fred]
features = ["enable-native-tls"]
version = "9.0.3"
version = "9.2.1"

[target.'cfg(target_os = "linux")'.dependencies]
tikv-jemallocator = "0.5"
tikv-jemallocator = "0.6"
12 changes: 8 additions & 4 deletions casper-server/src/http/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ use ntex::http::uri::{InvalidUri, InvalidUriParts, Scheme, Uri};
use ntex::http::StatusCode;
use opentelemetry::trace::{self, TraceContextExt as _, Tracer as _};
use opentelemetry::{global, Context, KeyValue};
use opentelemetry_semantic_conventions::trace::{
HTTP_REQUEST_METHOD, HTTP_RESPONSE_STATUS_CODE, URL_PATH, URL_QUERY,
};
use scopeguard::defer;
use tracing::{debug, instrument, Span};

Expand Down Expand Up @@ -67,8 +70,9 @@ pub async fn proxy_to_upstream(
.span_builder("proxy_to_upstream")
.with_kind(trace::SpanKind::Client)
.with_attributes([
KeyValue::new("request.method", req.method().to_string()),
KeyValue::new("request.uri", req.uri().to_string()),
KeyValue::new(HTTP_REQUEST_METHOD, req.method().to_string()),
KeyValue::new(URL_PATH, req.uri().path().to_string()),
KeyValue::new(URL_QUERY, req.uri().query().unwrap_or_default().to_string()),
])
.start(&tracer);
cx = cx.with_span(span);
Expand All @@ -89,7 +93,7 @@ pub async fn proxy_to_upstream(
let span = cx.span();
defer! { span.end(); }
let status_i64 = resp.status().as_u16() as i64;
span.set_attribute(KeyValue::new("response.status_code", status_i64));
span.set_attribute(KeyValue::new(HTTP_RESPONSE_STATUS_CODE, status_i64));
if resp.status().is_server_error() {
span.set_status(trace::Status::error("server error"));
} else if resp.status().is_success() {
Expand All @@ -112,7 +116,7 @@ pub async fn proxy_to_upstream(
_ => return Err(err.to_string().into_lua_err()),
};
let status_i64 = status.as_u16() as i64;
span.set_attribute(KeyValue::new("response.status_code", status_i64));
span.set_attribute(KeyValue::new(HTTP_RESPONSE_STATUS_CODE, status_i64));

let mut resp = LuaResponse::new(LuaBody::from(err.to_string()));
*resp.status_mut() = status;
Expand Down
3 changes: 1 addition & 2 deletions casper-server/src/lua/http/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,8 @@ impl LuaBody {
match self {
LuaBody::None => Ok(None),
LuaBody::Bytes(bytes) => Ok(Some(bytes.clone())),
_ => Ok(self.read().await?.map(|b| {
_ => Ok(self.read().await?.inspect(|b| {
*self = LuaBody::Bytes(b.clone());
b
})),
}
}
Expand Down
3 changes: 2 additions & 1 deletion casper-server/src/lua/http/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@ impl UserData for LuaResponse {
#[cfg(test)]
mod tests {
use mlua::{chunk, AnyUserData, Lua, Result};
use opentelemetry::Key;

use super::*;

Expand Down Expand Up @@ -591,7 +592,7 @@ mod tests {
.eval()
.unwrap();
let resp = resp.take::<LuaResponse>()?;
assert_eq!(resp.labels().unwrap()[&"hello".into()], "world".into());
assert_eq!(resp.labels().unwrap()[&Key::from("hello")], "world".into());

Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions casper-server/src/lua/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ where
.into_iter()
.map(|res| match res {
Ok(_) => Ok(Value::Boolean(true)),
Err(err) => Ok(Value::String(lua.create_string(&err.into().to_string())?)),
Err(err) => Ok(Value::String(lua.create_string(err.into().to_string())?)),
})
.collect::<LuaResult<Vec<_>>>()?;
Ok((false, Some(results)))
Expand Down Expand Up @@ -285,7 +285,7 @@ where
.into_iter()
.map(|res| match res {
Ok(size) => Ok(Value::Integer(size as _)),
Err(err) => Ok(Value::String(lua.create_string(&err.into().to_string())?)),
Err(err) => Ok(Value::String(lua.create_string(err.into().to_string())?)),
})
.collect::<LuaResult<Vec<_>>>()?;
Ok((None, Some(results)))
Expand Down
9 changes: 8 additions & 1 deletion casper-server/src/lua/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,12 @@ impl UserData for LuaSpan {
Ok(())
});

// Update the span name.
methods.add_method_mut("update_name", |_, this, name: String| {
this.0.update_name(name);
Ok(())
});

// Signals that the operation described by this span has now ended.
methods.add_method_mut("finish", |_, this, ()| {
this.0.end();
Expand Down Expand Up @@ -213,7 +219,8 @@ mod tests {

tracer.in_span("root", |_cx| {
lua.load(chunk! {
local span = $trace.new_span("child", "client")
local span = $trace.new_span("TBC", "client")
span:update_name("child")
span:set_attributes({ foo = 1, hello = "world" })
span:set_status("ok")
span:add_event("event", { bar = 1 })
Expand Down
21 changes: 13 additions & 8 deletions casper-server/src/middleware/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ use ntex::util::Bytes;
use ntex::web::{ErrorRenderer, WebRequest, WebResponse};
use opentelemetry::trace::{self, FutureExt, TraceContextExt, Tracer, TracerProvider as _};
use opentelemetry::{global, Context as OtelContext, KeyValue};
use opentelemetry_semantic_conventions::trace::{
HTTP_REQUEST_METHOD, HTTP_RESPONSE_STATUS_CODE, NETWORK_PEER_ADDRESS, URL_PATH, URL_QUERY,
};

use crate::config::TracingConfig;
use crate::http::trace::{ParentSamplingDecision, RequestHeaderCarrier};
Expand Down Expand Up @@ -64,7 +67,6 @@ where
forward_ready!(service);
forward_shutdown!(service);

#[inline]
async fn call(
&self,
req: WebRequest<E>,
Expand All @@ -87,11 +89,15 @@ where
.span_builder(format!("{} {}", req.method(), req.uri().path()))
.with_kind(trace::SpanKind::Server)
.with_attributes([
KeyValue::new("request.method", req.method().to_string()),
KeyValue::new("request.uri", req.uri().to_string()),
KeyValue::new("request.host", connection_info.host().to_string()),
KeyValue::new(HTTP_REQUEST_METHOD, req.method().to_string()),
KeyValue::new(URL_PATH, req.uri().path().to_string()),
KeyValue::new(URL_QUERY, req.uri().query().unwrap_or_default().to_string()),
KeyValue::new(
"http.request.header.host",
connection_info.host().to_string(),
),
KeyValue::new(
"request.peer_addr",
NETWORK_PEER_ADDRESS,
req.peer_addr()
.map(|addr| addr.to_string())
.unwrap_or_default(),
Expand Down Expand Up @@ -121,7 +127,7 @@ where

let status = response.status();
span.set_attribute(KeyValue::new(
"response.status_code",
HTTP_RESPONSE_STATUS_CODE,
status.as_u16() as i64,
));
if status.is_server_error() {
Expand All @@ -135,10 +141,9 @@ where
ResponseBody::Other(Body::from_message(StreamSpan { body, otel_cx }))
})
})
.map_err(|err| {
.inspect_err(|err| {
let span = otel_cx.span();
span.set_status(trace::Status::error(err.to_string()));
err
})
}
}
Expand Down
6 changes: 3 additions & 3 deletions casper-server/src/storage/backends/redis/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ impl RedisBackend {
num_chunks += 1;
// Store chunk in Redis
self.pool
.set(
.set::<(), _, _>(
make_chunk_key(&item.key, i as u32 + 1),
RedisValue::Bytes(chunk.to_vec().into()),
Some(Expiration::EX(ttl as i64)),
Expand Down Expand Up @@ -458,7 +458,7 @@ impl RedisBackend {

// Store response item
self.pool
.set(
.set::<(), _, _>(
make_redis_key(&item.key),
RedisValue::Bytes(response_item_enc.into()),
Some(Expiration::EX(ttl as i64)),
Expand Down Expand Up @@ -505,7 +505,7 @@ impl RedisBackend {
if refresh_ttl && rand::random::<u8>() % 100 < 1 {
// Refresh TTL with 1% probability
self.pool
.expire(make_redis_key(&skey), SURROGATE_KEYS_TTL)
.expire::<(), _>(make_redis_key(&skey), SURROGATE_KEYS_TTL)
.await?;
}
anyhow::Ok(())
Expand Down
2 changes: 1 addition & 1 deletion casper-server/src/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ fn init_opentelemetry(config: &Config) {
let mut pipeline_builder = opentelemetry_zipkin::new_pipeline()
.with_http_client(spawn_http_client())
.with_trace_config(
trace::config()
trace::Config::default()
.with_sampler(sampler)
.with_id_generator(RandomIdGenerator::default()),
);
Expand Down
Loading