Skip to content

Commit

Permalink
Add delay on entity re-ingestion on failed attempt
Browse files Browse the repository at this point in the history
When an entity failed to be deposited, PMPY would only wait the minimum age to retry to ingest the entity. This creates the problem where we don't let the problem to be fixed and we flood the logs with information that is no longer useful.

This change doubles the time of each attempt to give space for the problems to be fixed. There are 3 configuration parameters added:

- ingestion_prefix : The prefix used for redis keys storing the failed attempt count - Default value of  - Default value of `prod:pmpy_ingest_attempt:`
- ingestion_attempts : The max number of re-ingestion attempts - Default value of `15`
- first_failed_wait : The time to add at the beginning which will be double every retry - Default value of `10`

The default values will keep retrying for around a week and will drop the entity for ingestion after that.

This is related to issue [#297](#297).
  • Loading branch information
lagoan committed Jan 30, 2023
1 parent 931cdac commit 053c11a
Show file tree
Hide file tree
Showing 10 changed files with 131 additions and 35 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ and releases in PushmiPullyu adheres to [Semantic Versioning](https://semver.org

- Add rescue block to catch exceptions while waiting for next item [#280](https://github.com/ualbertalib/pushmi_pullyu/issues/280)
- Add logic to fetch new community and collection information from jupiter and create their AIPS. [#255](https://github.com/ualbertalib/pushmi_pullyu/issues/255)
- Add delay to re-ingestion attempts to allow for problems to be fixed [#297](https://github.com/ualbertalib/pushmi_pullyu/issues/297)
- Bump git from 1.9.1 to 1.13.0
## [2.0.4] - 2022-11-22

Expand Down
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@ Specific options:
-W, --workdir PATH Path for directory where AIP creation work takes place in
-N, --process_name NAME Name of the application process
-m, --monitor Start monitor process for a deamon
-q, --queue NAME Name of the queue to read from
-q, --queue NAME Name of the queue to read from
-i, --ingestion_prefix PREFIX Prefix for keys used in counting the number of failed ingestion attempts
-x, --ingestion_attempts NUMBER Max number of attempts to try ingesting an entity
-f, --first_failed_wait NUMBER Time in seconds to wait after first failed entity deposit. This time will double every failed attempt
Common options:
-v, --version Show version
Expand Down
3 changes: 3 additions & 0 deletions examples/pushmi_pullyu.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ piddir: tmp/pids
workdir: tmp/work
process_name: pushmi_pullyu
queue_name: dev:pmpy_queue
ingestion_prefix: "'prod:pmpy_ingest_attempt:'"
ingestion_attempts: 15
first_failed_wait: 10
minimum_age: 0

redis:
Expand Down
3 changes: 3 additions & 0 deletions lib/pushmi_pullyu.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ module PushmiPullyu
workdir: 'tmp/work',
process_name: 'pushmi_pullyu',
queue_name: 'dev:pmpy_queue',
ingestion_prefix: 'prod:pmpy_ingest_attempt:',
ingestion_attempts: 15,
first_failed_wait: 10,
redis: {
url: 'redis://localhost:6379'
},
Expand Down
41 changes: 29 additions & 12 deletions lib/pushmi_pullyu/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,21 @@ def parse_options(argv)
opts[:queue_name] = queue
end

o.on('-i', '--ingestion_prefix PREFIX',
'Prefix for keys used in counting the number of failed ingestion attempts') do |prefix|
opts[:ingestion_prefix] = prefix
end

o.on('-x', '--ingestion_attempts NUMBER', Integer,
'Max number of attempts to try ingesting an entity') do |ingestion_attempts|
opts[:ingestion_attempts] = ingestion_attempts
end

o.on('-f', '--first_failed_wait NUMBER', Integer,
'Time in seconds to wait after first failed deposit. Time will double every failed attempt') do |failed_wait|
opts[:first_failed_wait] = failed_wait
end

o.separator ''
o.separator 'Common options:'

Expand Down Expand Up @@ -183,16 +198,10 @@ def rotate_logs

def run_preservation_cycle
begin
entity_json = queue.wait_next_item
return unless entity_json

# jupiter is submitting the entries to reddis in a hash format using fat arrows. We need to change them to colons
# in order to parse them correctly from json
entity = JSON.parse(entity_json.gsub('=>', ':'), { symbolize_names: true })
return unless entity[:type].present? && entity[:uuid].present?
entity = queue.wait_next_item
return unless entity && entity[:type].present? && entity[:uuid].present?
rescue StandardError => e
Rollbar.error(e)
logger.error(e)
log_exception(e)
end

# add additional information about the error context to errors that occur while processing this item.
Expand All @@ -209,7 +218,11 @@ def run_preservation_cycle
# readding it to the queue as it will always fail
rescue PushmiPullyu::AIP::EntityInvalid => e
rescue StandardError => e
queue.add_entity_json(entity_json)
begin
queue.add_entity_in_timeframe(entity)
rescue MaxDepositAttemptsReached => e
log_exception(e)
end

# rubocop:disable Lint/RescueException
# Something other than a StandardError exception means something happened which we were not expecting!
Expand All @@ -218,8 +231,7 @@ def run_preservation_cycle
raise e
# rubocop:enable Lint/RescueException
ensure
Rollbar.error(e)
logger.error(e)
log_exception(e)
end
end

Expand Down Expand Up @@ -294,4 +306,9 @@ def start_server_as_daemon
end
end

def log_exception(exception)
Rollbar.error(exception)
logger.error(exception)
end

end
27 changes: 24 additions & 3 deletions lib/pushmi_pullyu/preservation_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
class PushmiPullyu::PreservationQueue

class ConnectionError < StandardError; end
class MaxDepositAttemptsReached < StandardError; end

def initialize(redis_url: 'redis://localhost:6379',
pool_opts: { size: 1, timeout: 5 },
Expand Down Expand Up @@ -50,7 +51,8 @@ def next_item
rd.multi do |tx|
tx.zrem(@queue_name, element) # remove the top element transactionally
end
return element

return JSON.parse(element, { symbolize_names: true })
else
rd.unwatch # cancel the transaction since there was nothing in the queue
return nil
Expand All @@ -68,10 +70,29 @@ def wait_next_item
end
end

def add_entity_json(entity_json)
def add_entity_in_timeframe(entity)
entity_attempts_key = "#{PushmiPullyu.options[:ingestion_prefix]}#{entity[:uuid]}"

@redis.with do |connection|
connection.zadd @queue_name, Time.now.to_f, entity_json
# separate information for priority information and queue
deposit_attempt = connection.incr entity_attempts_key

if deposit_attempt <= PushmiPullyu.options[:ingestion_attempts]
connection.zadd @queue_name, Time.now.to_f + self.class.extra_wait_time(deposit_attempt),
entity.slice(:uuid, :type).to_json
else
connection.del entity_attempts_key
raise MaxDepositAttemptsReached
end
end
end

def self.extra_wait_time(deposit_attempt)
extra = 0
deposit_attempt.times do |n|
extra += (2**n) * PushmiPullyu.options[:first_failed_wait]
end
extra
end

protected
Expand Down
3 changes: 3 additions & 0 deletions spec/fixtures/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ monitor: false
piddir: tmp/spec/pids
workdir: tmp/spec/work
process_name: test_pushmi_pullyu
ingestion_prefix: 'prod:pmpy_ingest_attempt:'
ingestion_attempts: 15
first_failed_wait: 10
minimum_age: 1
queue_name: test:pmpy_queue
rollbar:
Expand Down
6 changes: 1 addition & 5 deletions spec/integration/acceptance_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,7 @@
cli = PushmiPullyu::CLI.instance
cli.parse(['-C', 'spec/fixtures/config.yml', '-W', workdir])

entity_json = JSON.parse(cli.send(:queue).wait_next_item)
entity = {
type: entity_json['type'],
uuid: entity_json['uuid']
}
entity = cli.send(:queue).wait_next_item

expect(entity[:uuid]).to eq uuid

Expand Down
55 changes: 50 additions & 5 deletions spec/pushmi_pullyu/cli_spec.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
require 'spec_helper'
require 'tempfile'
require 'timecop'

RSpec.describe PushmiPullyu::CLI do
let(:cli) { PushmiPullyu::CLI.instance }
Expand Down Expand Up @@ -213,14 +214,20 @@
expect(PushmiPullyu.options[:queue_name]).to eq 'test:pmpy_queue'
expect(PushmiPullyu.options[:swift][:auth_url]).to eq 'http://127.0.0.1:8080/auth/v1.0'
expect(PushmiPullyu.options[:rollbar][:token]).to eq 'abc123xyz'
expect(PushmiPullyu.options[:ingestion_prefix]).to eq 'prod:pmpy_ingest_attempt:'
expect(PushmiPullyu.options[:ingestion_attempts]).to eq 15
expect(PushmiPullyu.options[:first_failed_wait]).to eq 10
end

it 'still allows command line arguments to take precedence' do
cli.parse(['start',
'-C', 'spec/fixtures/config.yml',
'--logdir', 'path/to/random',
'--minimum-age', '5',
'--piddir', 'path/to/piddir'])
'--piddir', 'path/to/piddir',
'--ingestion_prefix', 'prefix',
'--ingestion_attempts', '20',
'--first_failed_wait', '20'])

expect(PushmiPullyu.options[:daemonize]).to be_truthy
expect(PushmiPullyu.options[:config_file]).to eq 'spec/fixtures/config.yml'
Expand All @@ -230,6 +237,9 @@
expect(PushmiPullyu.options[:piddir]).to eq 'path/to/piddir'
expect(PushmiPullyu.options[:process_name]).to eq 'test_pushmi_pullyu'
expect(PushmiPullyu.options[:minimum_age]).to be 5.0
expect(PushmiPullyu.options[:ingestion_prefix]).to eq 'prefix'
expect(PushmiPullyu.options[:ingestion_attempts]).to eq 20
expect(PushmiPullyu.options[:first_failed_wait]).to eq 20
end
end

Expand Down Expand Up @@ -273,11 +283,44 @@
expect(old_options).to eq new_options
end

it 'makes sure an entities information is readded to reddis when deposit fails' do
it 'delays the repeated attempts when deposits fail' do
# Lets substract 10 seconds to avoid waiting for the item to be processed
# expect((readded_entity_score.to_i - test_time).to_i).to eq 10
cli.parse(['-C', 'spec/fixtures/config_wrong_swift.yml'])
redis = Redis.new
redis.zadd(PushmiPullyu.options[:queue_name], 10,
'{"uuid": "123e4567-e89b-12d3-a456-426614174000", "type": "items"}')
entity = { uuid: '123e4567-e89b-12d3-a456-426614174000', type: 'items' }

start_time = Time.now - 10
attempt_key = "#{PushmiPullyu.options[:ingestion_prefix]}#{entity[:uuid]}"
redis.zadd(PushmiPullyu.options[:queue_name], start_time.to_f, entity.to_json)
deposit_attempt = 0
redis.set(attempt_key, deposit_attempt)
Timecop.freeze do
while deposit_attempt < PushmiPullyu.options[:ingestion_attempts]
VCR.use_cassette('aip_download_and_swift_upload') do
PushmiPullyu::Logging.logger.fatal!
cli.send(:run_preservation_cycle)
PushmiPullyu::Logging.initialize_logger
time_now = Time.now.to_i
_readded_entity, readded_entity_score = redis.zrange(PushmiPullyu.options[:queue_name],
0, 0, with_scores: true).first
deposit_attempt = redis.get(attempt_key).to_i
extra_wait_time = PushmiPullyu::PreservationQueue.extra_wait_time(deposit_attempt)
expect(readded_entity_score.to_i - time_now).to eq extra_wait_time
# We dont want to wait for defined minimum age so we add it to the time travel shenanigans
Timecop.travel(extra_wait_time + PushmiPullyu.options[:minimum_age])
end
end
end
end

it 'makes sure an entities information is readded to redis when deposit fails' do
cli.parse(['-C', 'spec/fixtures/config_wrong_swift.yml'])
redis = Redis.new
entity = { uuid: '123e4567-e89b-12d3-a456-426614174000', type: 'items' }

redis.zadd(PushmiPullyu.options[:queue_name], 10, entity.to_json)
redis.set("#{PushmiPullyu.options[:ingestion_prefix]}#{entity[:uuid]}", 0)

original_entity_information, original_entity_score = redis.zrange(PushmiPullyu.options[:queue_name],
0, 0, with_scores: true).first
Expand All @@ -291,10 +334,12 @@
PushmiPullyu::Logging.initialize_logger
readded_entity, readded_entity_score = redis.zrange(PushmiPullyu.options[:queue_name],
0, 0, with_scores: true).first
readded_attempt = redis.get("#{PushmiPullyu.options[:ingestion_prefix]}#{entity[:uuid]}")
expect(original_entity_information).to eq readded_entity
expect(original_entity_score).not_to eq readded_entity_score
expect(readded_attempt).to eq '1'
end
redis.del(PushmiPullyu.options[:queue_name])
redis.flushall
end
end
end
Expand Down
22 changes: 13 additions & 9 deletions spec/pushmi_pullyu/preservation_queue_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,24 @@

before do
PushmiPullyu.server_running = true
redis.zadd 'test:pmpy_queue', 1, 'noid1'
redis.zadd 'test:pmpy_queue', 3, 'noid3'
redis.zadd 'test:pmpy_queue', 4, 'noid2'
redis.zadd 'test:pmpy_queue', 10, 'noid1'
redis.zadd 'test:pmpy_queue', 1, '{"uuid":"9e3be94f-a5de-4589-96ca-b18efba280c1","type":"items"}'
redis.zadd 'test:pmpy_queue', 3, '{"uuid":"9e3be94f-a5de-4589-96ca-b18efba280c3","type":"items"}'
redis.zadd 'test:pmpy_queue', 4, '{"uuid":"9e3be94f-a5de-4589-96ca-b18efba280c2","type":"items"}'
redis.zadd 'test:pmpy_queue', 10, '{"uuid":"9e3be94f-a5de-4589-96ca-b18efba280c1","type":"items"}'
end

after do
redis.del 'test:pmpy_queue'
end

it 'retrieves 3 items in priority order' do
expect(queue.wait_next_item).to eq 'noid3'
expect(queue.wait_next_item).to eq 'noid2'
expect(queue.wait_next_item).to eq 'noid1'
next_item = queue.wait_next_item

expect(next_item[:uuid]).to eq '9e3be94f-a5de-4589-96ca-b18efba280c3'
next_item = queue.wait_next_item
expect(next_item[:uuid]).to eq '9e3be94f-a5de-4589-96ca-b18efba280c2'
next_item = queue.wait_next_item
expect(next_item[:uuid]).to eq '9e3be94f-a5de-4589-96ca-b18efba280c1'
end
end

Expand All @@ -32,7 +36,7 @@
let!(:redis) { Redis.new }

before do
redis.zadd 'test:pmpy_queue', Time.now.to_f, 'noid1'
redis.zadd 'test:pmpy_queue', Time.now.to_f, '{"uuid":"9e3be94f-a5de-4589-96ca-b18efba280c1","type":"items"}'
end

after do
Expand All @@ -50,7 +54,7 @@
expect(queue.next_item).to be_nil

Timecop.travel(now + 15.minutes)
expect(queue.next_item).to eq 'noid1'
expect(queue.next_item[:uuid]).to eq '9e3be94f-a5de-4589-96ca-b18efba280c1'
end
end
end

0 comments on commit 053c11a

Please sign in to comment.