Skip to content

Commit

Permalink
Track maximum runtime of currently running jobs (#17)
Browse files Browse the repository at this point in the history
Closes #13
  • Loading branch information
Envek authored Apr 28, 2021
1 parent d39772b commit 91c8f4e
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 0 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ end
- Number of jobs in dead set (“morgue”): `sidekiq_jobs_dead_count`
- Active workers count: `sidekiq_active_processes`
- Active processes count: `sidekiq_active_workers_count`
- Maximum runtime of currently executing jobs: `sidekiq_running_job_runtime` (useful for detection of hung jobs, segmented by queue and class name)

## Custom tags

Expand Down
20 changes: 20 additions & 0 deletions lib/yabeda/sidekiq.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ module Sidekiq
gauge :jobs_dead_count, tags: [], comment: "The number of jobs exceeded their retry count."
gauge :active_processes, tags: [], comment: "The number of active Sidekiq worker processes."
gauge :queue_latency, tags: %i[queue], comment: "The queue latency, the difference in seconds since the oldest job in the queue was enqueued"
gauge :running_job_runtime, tags: %i[queue worker], aggregation: :max, unit: :seconds,
comment: "How long currently running jobs are running (useful for detection of hung jobs)"

histogram :job_latency, comment: "The job latency, the difference in seconds between enqueued and running time",
unit: :seconds, per: :job,
Expand All @@ -59,6 +61,8 @@ module Sidekiq
sidekiq_queue_latency.set({ queue: queue.name }, queue.latency)
end

Yabeda::Sidekiq.track_max_job_runtime

# That is quite slow if your retry set is large
# I don't want to enable it by default
# retries_by_queues =
Expand Down Expand Up @@ -105,6 +109,22 @@ def custom_tags(worker, job)

worker.method(:yabeda_tags).arity.zero? ? worker.yabeda_tags : worker.yabeda_tags(*job["args"])
end

# Hash of hashes containing all currently running jobs' start timestamps
# to calculate maximum durations of currently running not yet completed jobs
# { { queue: "default", worker: "SomeJob" } => { "jid1" => 100500, "jid2" => 424242 } }
attr_accessor :jobs_started_at

def track_max_job_runtime
now = Process.clock_gettime(Process::CLOCK_MONOTONIC)
::Yabeda::Sidekiq.jobs_started_at.each do |labels, jobs|
oldest_job_started_at = jobs.values.min
oldest_job_duration = oldest_job_started_at ? (now - oldest_job_started_at).round(3) : 0
Yabeda.sidekiq.running_job_runtime.set(labels, oldest_job_duration)
end
end
end

self.jobs_started_at = Concurrent::Hash.new { |hash, key| hash[key] = Concurrent::Hash.new }
end
end
2 changes: 2 additions & 0 deletions lib/yabeda/sidekiq/server_middleware.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ def call(worker, job, queue)
begin
job_instance = ::Sidekiq::Job.new(job)
Yabeda.sidekiq_job_latency.measure(labels, job_instance.latency)
Yabeda::Sidekiq.jobs_started_at[labels][job["jid"]] = start
Yabeda.with_tags(**custom_tags) do
yield
end
Expand All @@ -22,6 +23,7 @@ def call(worker, job, queue)
ensure
Yabeda.sidekiq_job_runtime.measure(labels, elapsed(start))
Yabeda.sidekiq_jobs_executed_total.increment(labels)
Yabeda::Sidekiq.jobs_started_at[labels].delete(job["jid"])
end
end
# rubocop: enable Metrics/AbcSize, Metrics/MethodLength:
Expand Down
9 changes: 9 additions & 0 deletions spec/support/jobs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,15 @@ def perform(*_args)
end
end

class SampleLongRunningJob
include Sidekiq::Worker

def perform(*_args)
sleep 0.05
"Phew, I'm done!"
end
end

class SampleComplexJob
include Sidekiq::Worker

Expand Down
1 change: 1 addition & 0 deletions spec/support/sidekiq_inline_middlewares.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ def push(job)
return super unless Sidekiq::Testing.inline?

job = Sidekiq.load_json(Sidekiq.dump_json(job))
job["jid"] ||= SecureRandom.hex(12)
job_class = Sidekiq::Testing.constantize(job["class"])
job_instance = job_class.new
queue = (job_instance.sidekiq_options_hash || {}).fetch("queue", "default")
Expand Down
38 changes: 38 additions & 0 deletions spec/yabeda/sidekiq_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -167,5 +167,43 @@
expect(Yabeda.sidekiq.jobs_dead_count.values).to eq({ {} => 3 })
expect(Yabeda.sidekiq.jobs_scheduled_count.values).to eq({ {} => 2 })
end

it "measures maximum runtime of currently running jobs" do
Yabeda.sidekiq.running_job_runtime.values.clear # This is a hack
described_class.jobs_started_at.clear

Sidekiq::Testing.inline! do
workers = []
workers.push(Thread.new { SampleLongRunningJob.perform_async })
sleep 0.012 # Ruby can sleep less than requested
workers.push(Thread.new { SampleLongRunningJob.perform_async })

Yabeda.collectors.each(&:call)
expect(Yabeda.sidekiq.running_job_runtime.values).to include(
{ queue: "default", worker: "SampleLongRunningJob" } => (be >= 0.010),
)

sleep 0.012 # Ruby can sleep less than requested
begin
FailingActiveJob.perform_later
rescue StandardError
nil
end
Yabeda.collectors.each(&:call)

expect(Yabeda.sidekiq.running_job_runtime.values).to include(
{ queue: "default", worker: "SampleLongRunningJob" } => (be >= 0.020),
{ queue: "default", worker: "FailingActiveJob" } => 0,
)

# When all jobs are completed, metric should respond with zero
workers.map(&:join)
Yabeda.collectors.each(&:call)
expect(Yabeda.sidekiq.running_job_runtime.values).to include(
{ queue: "default", worker: "SampleLongRunningJob" } => 0,
{ queue: "default", worker: "FailingActiveJob" } => 0,
)
end
end
end
end

0 comments on commit 91c8f4e

Please sign in to comment.