Skip to content

Commit

Permalink
Merge pull request #460 from open-telemetry/batch-timeout-fix
Browse files Browse the repository at this point in the history
expanded exporter APIs with explicitTimeout on force flush and shutdown
  • Loading branch information
bryce-b authored Sep 21, 2023
2 parents 5c74e87 + cbaad29 commit 8d6fa74
Show file tree
Hide file tree
Showing 50 changed files with 2,198 additions and 2,115 deletions.
6 changes: 3 additions & 3 deletions Sources/Exporters/DatadogExporter/DatadogExporter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public class DatadogExporter: SpanExporter, MetricExporter {
metricsExporter = try MetricsExporter(config: configuration)
}

public func export(spans: [SpanData]) -> SpanExporterResultCode {
public func export(spans: [SpanData], explicitTimeout: TimeInterval?) -> SpanExporterResultCode {
spans.forEach {
if $0.traceFlags.sampled || configuration.exportUnsampledSpans {
spansExporter?.exportSpan(span: $0)
Expand All @@ -38,7 +38,7 @@ public class DatadogExporter: SpanExporter, MetricExporter {
return .success
}

public func flush() -> SpanExporterResultCode {
public func flush(explicitTimeout: TimeInterval?) -> SpanExporterResultCode {
spansExporter?.tracesStorage.writer.queue.sync {}
logsExporter?.logsStorage.writer.queue.sync {}
metricsExporter?.metricsStorage.writer.queue.sync {}
Expand All @@ -49,7 +49,7 @@ public class DatadogExporter: SpanExporter, MetricExporter {
return .success
}

public func shutdown() {
public func shutdown(explicitTimeout: TimeInterval?) {
_ = self.flush()
}

Expand Down
64 changes: 32 additions & 32 deletions Sources/Exporters/InMemory/InMemoryExporter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -7,38 +7,38 @@ import Foundation
import OpenTelemetrySdk

public class InMemoryExporter: SpanExporter {
private var finishedSpanItems: [SpanData] = []
private var isRunning: Bool = true

public init() {}

public func getFinishedSpanItems() -> [SpanData] {
return finishedSpanItems
}

public func export(spans: [SpanData]) -> SpanExporterResultCode {
guard isRunning else {
return .failure
}

finishedSpanItems.append(contentsOf: spans)
return .success
private var finishedSpanItems: [SpanData] = []
private var isRunning: Bool = true

public init() {}

public func getFinishedSpanItems() -> [SpanData] {
return finishedSpanItems
}

public func export(spans: [SpanData], explicitTimeout: TimeInterval? = nil) -> SpanExporterResultCode {
guard isRunning else {
return .failure
}

public func flush() -> SpanExporterResultCode {
guard isRunning else {
return .failure
}

return .success
}

public func reset() {
finishedSpanItems.removeAll()
}

public func shutdown() {
finishedSpanItems.removeAll()
isRunning = false

finishedSpanItems.append(contentsOf: spans)
return .success
}

public func flush(explicitTimeout: TimeInterval? = nil) -> SpanExporterResultCode {
guard isRunning else {
return .failure
}

return .success
}

public func reset() {
finishedSpanItems.removeAll()
}

public func shutdown(explicitTimeout: TimeInterval? = nil) {
finishedSpanItems.removeAll()
isRunning = false
}
}
46 changes: 23 additions & 23 deletions Sources/Exporters/Jaeger/JaegerSpanExporter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -10,29 +10,29 @@ import OpenTelemetrySdk
import Thrift

public class JaegerSpanExporter: SpanExporter {
let collectorAddress: String
let process: Process

public init(serviceName: String, collectorAddress: String) {
process = Process(serviceName: serviceName, tags: TList<Tag>())
self.collectorAddress = collectorAddress
}

public func export(spans: [SpanData]) -> SpanExporterResultCode {
var spanList = TList<Span>()
spanList.append(contentsOf: Adapter.toJaeger(spans: spans))
let batch = Batch(process: process, spans: spanList)
let sender = Sender(host: collectorAddress)
let success = sender.sendBatch(batch: batch)
return success ? SpanExporterResultCode.success : SpanExporterResultCode.failure
}

public func flush() -> SpanExporterResultCode {
return .success
}

public func shutdown() {
}
let collectorAddress: String
let process: Process
public init(serviceName: String, collectorAddress: String) {
process = Process(serviceName: serviceName, tags: TList<Tag>())
self.collectorAddress = collectorAddress
}
public func export(spans: [SpanData], explicitTimeout: TimeInterval? = nil) -> SpanExporterResultCode {
var spanList = TList<Span>()
spanList.append(contentsOf: Adapter.toJaeger(spans: spans))
let batch = Batch(process: process, spans: spanList)
let sender = Sender(host: collectorAddress)
let success = sender.sendBatch(batch: batch)
return success ? SpanExporterResultCode.success : SpanExporterResultCode.failure
}
public func flush(explicitTimeout: TimeInterval? = nil) -> SpanExporterResultCode {
return .success
}
public func shutdown(explicitTimeout: TimeInterval? = nil) {
}
}

#endif
102 changes: 51 additions & 51 deletions Sources/Exporters/OpenTelemetryProtocolGrpc/logs/OtlpLogExporter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,59 +13,59 @@ import OpenTelemetrySdk
import OpenTelemetryProtocolExporterCommon

public class OtlpLogExporter : LogRecordExporter {
let channel : GRPCChannel
var logClient : Opentelemetry_Proto_Collector_Logs_V1_LogsServiceNIOClient
let config : OtlpConfiguration
var callOptions : CallOptions

public init(channel: GRPCChannel,
config: OtlpConfiguration = OtlpConfiguration(),
logger: Logging.Logger = Logging.Logger(label: "io.grpc", factory: { _ in SwiftLogNoOpLogHandler() }),
envVarHeaders: [(String, String)]? = EnvVarHeaders.attributes){
self.channel = channel
logClient = Opentelemetry_Proto_Collector_Logs_V1_LogsServiceNIOClient(channel: channel)
self.config = config
let userAgentHeader = (Constants.HTTP.userAgent, Headers.getUserAgentHeader())
if let headers = envVarHeaders {
var updatedHeaders = headers
updatedHeaders.append(userAgentHeader)
callOptions = CallOptions(customMetadata: HPACKHeaders(updatedHeaders), logger: logger)
} else if let headers = config.headers {
var updatedHeaders = headers
updatedHeaders.append(userAgentHeader)
callOptions = CallOptions(customMetadata: HPACKHeaders(updatedHeaders), logger: logger)
}
else {
var headers = [(String, String)]()
headers.append(userAgentHeader)
callOptions = CallOptions(customMetadata: HPACKHeaders(headers), logger: logger)
}
let channel : GRPCChannel
var logClient : Opentelemetry_Proto_Collector_Logs_V1_LogsServiceNIOClient
let config : OtlpConfiguration
var callOptions : CallOptions

public init(channel: GRPCChannel,
config: OtlpConfiguration = OtlpConfiguration(),
logger: Logging.Logger = Logging.Logger(label: "io.grpc", factory: { _ in SwiftLogNoOpLogHandler() }),
envVarHeaders: [(String, String)]? = EnvVarHeaders.attributes){
self.channel = channel
logClient = Opentelemetry_Proto_Collector_Logs_V1_LogsServiceNIOClient(channel: channel)
self.config = config
let userAgentHeader = (Constants.HTTP.userAgent, Headers.getUserAgentHeader())
if let headers = envVarHeaders {
var updatedHeaders = headers
updatedHeaders.append(userAgentHeader)
callOptions = CallOptions(customMetadata: HPACKHeaders(updatedHeaders), logger: logger)
} else if let headers = config.headers {
var updatedHeaders = headers
updatedHeaders.append(userAgentHeader)
callOptions = CallOptions(customMetadata: HPACKHeaders(updatedHeaders), logger: logger)
}

public func export(logRecords: [ReadableLogRecord]) -> ExportResult {
let logRequest = Opentelemetry_Proto_Collector_Logs_V1_ExportLogsServiceRequest.with { request in
request.resourceLogs = LogRecordAdapter.toProtoResourceRecordLog(logRecordList: logRecords)
}

if config.timeout > 0 {
callOptions.timeLimit = TimeLimit.timeout(TimeAmount.nanoseconds(Int64(config.timeout.toNanoseconds)))
}


let export = logClient.export(logRequest, callOptions: callOptions)
do {
_ = try export.response.wait()
return .success
} catch {
return .failure
}
else {
var headers = [(String, String)]()
headers.append(userAgentHeader)
callOptions = CallOptions(customMetadata: HPACKHeaders(headers), logger: logger)
}

public func shutdown() {
_ = channel.close()
}

public func export(logRecords: [ReadableLogRecord], explicitTimeout: TimeInterval? = nil) -> ExportResult {
let logRequest = Opentelemetry_Proto_Collector_Logs_V1_ExportLogsServiceRequest.with { request in
request.resourceLogs = LogRecordAdapter.toProtoResourceRecordLog(logRecordList: logRecords)
}

public func forceFlush() -> ExportResult {
.success
let timeout = min(explicitTimeout ?? TimeInterval.greatestFiniteMagnitude, config.timeout)
if timeout > 0 {
callOptions.timeLimit = TimeLimit.timeout(TimeAmount.nanoseconds(Int64(timeout.toNanoseconds)))
}


let export = logClient.export(logRequest, callOptions: callOptions)
do {
_ = try export.response.wait()
return .success
} catch {
return .failure
}
}

public func shutdown(explicitTimeout: TimeInterval? = nil) {
_ = channel.close()
}

public func forceFlush(explicitTimeout: TimeInterval? = nil) -> ExportResult {
.success
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,56 +13,58 @@ import OpenTelemetrySdk
import OpenTelemetryProtocolExporterCommon

public class OtlpTraceExporter: SpanExporter {
let channel: GRPCChannel
var traceClient: Opentelemetry_Proto_Collector_Trace_V1_TraceServiceNIOClient
let config : OtlpConfiguration
var callOptions : CallOptions

public init(channel: GRPCChannel, config: OtlpConfiguration = OtlpConfiguration(), logger: Logging.Logger = Logging.Logger(label: "io.grpc", factory: { _ in SwiftLogNoOpLogHandler() }), envVarHeaders: [(String,String)]? = EnvVarHeaders.attributes) {
self.channel = channel
traceClient = Opentelemetry_Proto_Collector_Trace_V1_TraceServiceNIOClient(channel: channel)
self.config = config
let userAgentHeader = (Constants.HTTP.userAgent, Headers.getUserAgentHeader())
if let headers = envVarHeaders {
var updatedHeaders = headers
updatedHeaders.append(userAgentHeader)
callOptions = CallOptions(customMetadata: HPACKHeaders(updatedHeaders), logger: logger)
} else if let headers = config.headers {
var updatedHeaders = headers
updatedHeaders.append(userAgentHeader)
callOptions = CallOptions(customMetadata: HPACKHeaders(updatedHeaders), logger: logger)
} else {
var headers = [(String, String)]()
headers.append(userAgentHeader)
callOptions = CallOptions(customMetadata: HPACKHeaders(headers), logger: logger)
}

let channel: GRPCChannel
var traceClient: Opentelemetry_Proto_Collector_Trace_V1_TraceServiceNIOClient
let config : OtlpConfiguration
var callOptions : CallOptions

public init(channel: GRPCChannel, config: OtlpConfiguration = OtlpConfiguration(), logger: Logging.Logger = Logging.Logger(label: "io.grpc", factory: { _ in SwiftLogNoOpLogHandler() }), envVarHeaders: [(String,String)]? = EnvVarHeaders.attributes) {
self.channel = channel
traceClient = Opentelemetry_Proto_Collector_Trace_V1_TraceServiceNIOClient(channel: channel)
self.config = config
let userAgentHeader = (Constants.HTTP.userAgent, Headers.getUserAgentHeader())
if let headers = envVarHeaders {
var updatedHeaders = headers
updatedHeaders.append(userAgentHeader)
callOptions = CallOptions(customMetadata: HPACKHeaders(updatedHeaders), logger: logger)
} else if let headers = config.headers {
var updatedHeaders = headers
updatedHeaders.append(userAgentHeader)
callOptions = CallOptions(customMetadata: HPACKHeaders(updatedHeaders), logger: logger)
} else {
var headers = [(String, String)]()
headers.append(userAgentHeader)
callOptions = CallOptions(customMetadata: HPACKHeaders(headers), logger: logger)
}

public func export(spans: [SpanData]) -> SpanExporterResultCode {
let exportRequest = Opentelemetry_Proto_Collector_Trace_V1_ExportTraceServiceRequest.with {
$0.resourceSpans = SpanAdapter.toProtoResourceSpans(spanDataList: spans)
}

if config.timeout > 0 {
callOptions.timeLimit = TimeLimit.timeout(TimeAmount.nanoseconds(Int64(config.timeout.toNanoseconds)))
}

let export = traceClient.export(exportRequest, callOptions: callOptions)

do {
// wait() on the response to stop the program from exiting before the response is received.
_ = try export.response.wait()
return .success
} catch {
return .failure
}
}


public func export(spans: [SpanData], explicitTimeout: TimeInterval? = nil) -> SpanExporterResultCode {
let exportRequest = Opentelemetry_Proto_Collector_Trace_V1_ExportTraceServiceRequest.with {
$0.resourceSpans = SpanAdapter.toProtoResourceSpans(spanDataList: spans)
}

public func flush() -> SpanExporterResultCode {
return .success
let timeout = min(explicitTimeout ?? TimeInterval.greatestFiniteMagnitude, config.timeout)
if timeout > 0 {
callOptions.timeLimit = TimeLimit.timeout(TimeAmount.nanoseconds(Int64(timeout.toNanoseconds)))
}

public func shutdown() {
_ = channel.close()

let export = traceClient.export(exportRequest, callOptions: callOptions)

do {
// wait() on the response to stop the program from exiting before the response is received.
_ = try export.response.wait()
return .success
} catch {
return .failure
}
}

public func flush(explicitTimeout: TimeInterval? = nil) -> SpanExporterResultCode {
return .success
}

public func shutdown(explicitTimeout: TimeInterval? = nil) {
_ = channel.close()
}
}
Loading

0 comments on commit 8d6fa74

Please sign in to comment.