Skip to content

Commit

Permalink
Merge pull request #448 from sul-dlss/kafka
Browse files Browse the repository at this point in the history
Read publish updates from Kafka
  • Loading branch information
thatbudakguy authored Apr 10, 2023
2 parents 72a8b69 + 800a2ab commit 9ae0264
Show file tree
Hide file tree
Showing 12 changed files with 55 additions and 190 deletions.
2 changes: 1 addition & 1 deletion Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ gem 'newrelic_rpm', group: :production
gem 'nokogiri'
gem 'okcomputer'
gem 'parallel'
gem 'purl_fetcher-client', '~> 0.3'
gem 'racecar', '~> 2.8'
gem 'redis', '~> 5.0'
gem 'redlock'
gem 'rsolr'
Expand Down
41 changes: 10 additions & 31 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,6 @@ GEM
docile (1.4.0)
domain_name (0.5.20190701)
unf (>= 0.0.5, < 1.0.0)
dor-rights-auth (1.7.0)
nokogiri
dry-configurable (1.0.1)
dry-core (~> 1.0, < 2)
zeitwerk (~> 2.6)
Expand Down Expand Up @@ -151,8 +149,6 @@ GEM
dry-initializer (~> 3.0)
dry-schema (>= 1.12, < 2)
zeitwerk (~> 2.6)
edtf (3.1.1)
activesupport (>= 3.0, < 8.0)
erubi (1.12.0)
faraday (2.7.4)
faraday-net_http (>= 2.0, < 3.1)
Expand All @@ -178,8 +174,8 @@ GEM
io-console (0.6.0)
irb (1.6.4)
reline (>= 0.3.0)
iso-639 (0.3.6)
json (2.6.3)
king_konf (1.0.1)
llhttp-ffi (0.4.0)
ffi-compiler (~> 1.0)
rake (~> 13.0)
Expand All @@ -194,17 +190,9 @@ GEM
marcel (1.0.2)
method_source (1.0.0)
mini_mime (1.1.2)
mini_portile2 (2.8.1)
minitar (0.9)
minitest (5.18.0)
mods (3.0.3)
edtf (~> 3.0)
iso-639
nokogiri (>= 1.6.6)
nom-xml (~> 1.0)
mods_display (1.3.1)
i18n
stanford-mods (~> 3.3)
view_component
msgpack (1.7.0)
net-imap (0.3.4)
date
Expand All @@ -224,23 +212,17 @@ GEM
racc (~> 1.4)
nokogiri (1.14.2-x86_64-linux)
racc (~> 1.4)
nom-xml (1.2.0)
i18n
nokogiri
okcomputer (1.18.4)
parallel (1.22.1)
parser (3.2.2.0)
ast (~> 2.4.1)
public_suffix (5.0.1)
puma (5.6.5)
nio4r (~> 2.0)
purl_fetcher-client (0.5.0)
dor-rights-auth
http
mods_display (>= 1.0.0.alpha1)
nokogiri
stanford-mods
racc (1.6.2)
racecar (2.8.2)
king_konf (~> 1.0.0)
rdkafka (~> 0.12.0)
rack (2.2.6.4)
rack-test (2.1.0)
rack (>= 1.3)
Expand Down Expand Up @@ -272,6 +254,10 @@ GEM
zeitwerk (~> 2.5)
rainbow (3.1.1)
rake (13.0.6)
rdkafka (0.12.0)
ffi (~> 1.15)
mini_portile2 (~> 2.6)
rake (> 12)
redis (5.0.6)
redis-client (>= 0.9.0)
redis-client (0.14.1)
Expand Down Expand Up @@ -348,9 +334,6 @@ GEM
sshkit (1.21.4)
net-scp (>= 1.1.2)
net-ssh (>= 2.8.0)
stanford-mods (3.3.2)
activesupport
mods (~> 3.0, >= 3.0.3)
thor (1.2.1)
timeout (0.3.2)
tzinfo (2.0.6)
Expand All @@ -359,10 +342,6 @@ GEM
unf_ext
unf_ext (0.0.8.2)
unicode-display_width (2.4.2)
view_component (2.82.0)
activesupport (>= 5.2.0, < 8.0)
concurrent-ruby (~> 1.0)
method_source (~> 1.0)
websocket-driver (0.7.5)
websocket-extensions (>= 0.1.0)
websocket-extensions (0.1.5)
Expand Down Expand Up @@ -392,7 +371,7 @@ DEPENDENCIES
okcomputer
parallel
puma (~> 5.0)
purl_fetcher-client (~> 0.3)
racecar (~> 2.8)
rails (~> 7.0.0)
redis (~> 5.0)
redlock
Expand Down
14 changes: 14 additions & 0 deletions app/consumers/publish_consumer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# frozen_string_literal: true

# Harvest changed content from kafka
class PublishConsumer < Racecar::Consumer
subscribes_to Settings.kafka.topic
# Set group_id differently in prod and uat, so they can both receive the message
self.group_id = Settings.kafka.group_id

def process(message)
data = JSON.parse(message.value)

Search.client.delete_by_query("druid:#{data['druid']}", params: { commit: true })
end
end
8 changes: 0 additions & 8 deletions app/jobs/delete_content_from_index_job.rb

This file was deleted.

2 changes: 1 addition & 1 deletion app/jobs/garbage_collect_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
class GarbageCollectJob < ApplicationJob
def perform
response['response']['docs'].each do |doc|
DeleteContentFromIndexJob.perform_now(doc['druid'])
Search.client.delete_by_query("druid:#{doc['druid']}", params: { commit: true })
end
end

Expand Down
61 changes: 0 additions & 61 deletions app/jobs/harvest_purl_fetcher_job.rb

This file was deleted.

6 changes: 5 additions & 1 deletion config/deploy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
# set :pty, true

# Default value for :linked_files is []
set :linked_files, ->{ ["config/master.key", "config/honeybadger.yml", "config/newrelic.yml", "tmp/harvest_purl_fetcher_job_last_run_#{fetch(:rails_env)}"] }
set :linked_files, ->{ ["config/master.key", "config/honeybadger.yml", "config/newrelic.yml"] }

# Default value for linked_dirs is []
append :linked_dirs, "log", "tmp/pids", "tmp/cache", "tmp/sockets", "public/system", "config/settings"
Expand All @@ -45,6 +45,10 @@

set :whenever_roles, [:indexer]

# Manage racecar via systemd (from dlss-capistrano gem)
set :racecar_systemd_role, :indexer
set :racecar_systemd_use_hooks, true

namespace :deploy do
after :restart, :restart_sidekiq do
on roles(:app) do
Expand Down
5 changes: 0 additions & 5 deletions config/schedule.rb
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
job_type :runner, "cd :path && :environment_variable=:environment bin/rails runner -e :environment ':task' :output"

# index + delete SDR
every '*/15 * * * *' do
runner 'HarvestPurlFetcherJob.perform_now'
end

# Garbage collect the index
every '15 * * * *' do
runner 'GarbageCollectJob.perform_now'
Expand Down
4 changes: 3 additions & 1 deletion config/settings.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,6 @@ solr:
suggest_params:
'suggest.count': 1000

purl_fetcher: {}
kafka:
topic: testing_topic # Can be purl_fetcher_stage or purl_fetcher_prod
group_id: content-search
21 changes: 21 additions & 0 deletions spec/consumers/publish_consumer_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# frozen_string_literal: true

require 'rails_helper'

RSpec.describe PublishConsumer do
let(:consumer) { described_class.new }

describe '#process' do
before do
allow(Search).to receive(:client).and_return(instance_double(RSolr::Client, delete_by_query: nil))
consumer.process(message)
end

let(:message) { instance_double(Racecar::Message, value: message_value) }
let(:message_value) { { druid: 'druid:123' }.to_json }

it 'creates a delete content job' do
expect(Search.client).to have_received(:delete_by_query).with('druid:druid:123', params: { commit: true })
end
end
end
13 changes: 0 additions & 13 deletions spec/jobs/delete_content_from_index_job_spec.rb

This file was deleted.

68 changes: 0 additions & 68 deletions spec/jobs/harvest_purl_fetcher_job_spec.rb

This file was deleted.

0 comments on commit 9ae0264

Please sign in to comment.