Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adjust job-id params and namespace for sending files to s3 #1373

Merged
merged 3 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions lib/dor/text_extraction/speech_to_text.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@ module Dor
module TextExtraction
# Determine if speech to text is required and possible for a given object
class SpeechToText
attr_reader :cocina_object, :workflow_context, :bare_druid, :logger
attr_reader :cocina_object, :workflow_context, :bare_druid

def initialize(cocina_object:, workflow_context: {}, logger: nil)
def initialize(cocina_object:, workflow_context: {})
@cocina_object = cocina_object
@workflow_context = workflow_context
@bare_druid = cocina_object.externalIdentifier.delete_prefix('druid:')
@logger = logger || Logger.new($stdout)
end
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this class never uses the logger, so remove for now unless we specifically need it later (it came as copypasta from the similar Ocr class)


def possible?
Expand Down Expand Up @@ -40,6 +39,16 @@ def filenames_to_stt
stt_files.map(&:filename)
end

# return the s3 location for a given filename
def s3_location(filename)
File.join(job_id, filename)
end

# return the job_id for the stt job, defined as the druid-version of the object
def job_id
"#{bare_druid}-v#{cocina_object.version}"
end

private

# iterate through cocina structural contains and return all File objects for files that need to be stt'd
Expand Down
8 changes: 4 additions & 4 deletions lib/robots/dor_repo/speech_to_text/fetch_files.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ def initialize

# available from LyberCore::Robot: druid, bare_druid, workflow_service, object_client, cocina_object, logger
def perform_work
sttable_filenames.each do |filename|
raise "Unable to fetch #{filename} for #{druid}" unless file_fetcher.write_file_with_retries(filename:, location: aws_provider.bucket.object(File.join(bare_druid, filename)), max_tries: 3)
speech_to_text.filenames_to_stt.each do |filename|
raise "Unable to fetch #{filename} for #{druid}" unless file_fetcher.write_file_with_retries(filename:, location: aws_provider.bucket.object(speech_to_text.s3_location(filename)), max_tries: 3)
end
end

private

def sttable_filenames
Dor::TextExtraction::SpeechToText.new(cocina_object:, workflow_context: workflow.context).filenames_to_stt
def speech_to_text
@speech_to_text ||= Dor::TextExtraction::SpeechToText.new(cocina_object:)
end

def file_fetcher
Expand Down
21 changes: 12 additions & 9 deletions lib/robots/dor_repo/speech_to_text/stt_create.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,27 +25,30 @@ def perform_work
private

def send_sqs_message
message_body = {
id: job_id,
druid:,
media:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this got removed in the documentation, but afaict from the code and from testing locally yesterday evening, the media field is actually still required in the message body: https://github.com/sul-dlss/speech-to-text/blob/1fd0eb35b721349c75c75be5151dffc8564d4084/speech_to_text.py#L80-L85

i assume the removal from STT documentation was a mistake or a leftover from a discarded development path with the code. so adding that field back was one of the little touchups i was going to make (to the speech-to-text README) in the small follow-on PR i mentioned this morning at standup.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(or, maybe you and ed have discussed and this lines up with a coming change in the speech-to-text python worker?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I hadn't further discussed this was @edsu - was operating under the assumption that the whipser container would look for all files that matched the key for the message sent.

BUT... we could also continue to send the media key here in the message body, and it could basically do this for the container (i.e. look at what is in S3 for that job_id and then send all of those files in the message body)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, i could see either approach, and don't think i have a well thought out preference. seems like an easy adjustment either way... i might bias towards assuming for now that the current worker code is right... i just put up a small PR to that effect in speech-to-text: sul-dlss/speech-to-text#22

unless @edsu has this change to just list bucket contents in the works already?

i think we discussed this at the end of last week, but i can't remember if we settled on an approach or which way we decided if so.

i have to grab lunch after this DLSS all staff that's happening right now, and it'll be 5 eastern time once that's over anyway... maybe we can confer and decide (or remind me of last week's decision 😅) in the morning? seems like waiting till the morning shouldn't leave anyone seriously blocked this afternoon?

}.merge(whisper_options).to_json

# Send the message to the SQS queue
aws_provider.sqs.send_message({
queue_url: aws_provider.sqs_todo_queue_url,
message_body:
})

logger.info("Sent SQS message for druid #{druid} to queue #{aws_provider.sqs_todo_queue_url}")
logger.info("Sent SQS message for druid #{druid} to queue #{aws_provider.sqs_todo_queue_url} with job_id #{job_id}")
end

def message_body
{
id: job_id,
druid:,
media:
}.merge(whisper_options).to_json
end

def job_id
@job_id ||= SecureRandom.uuid
@job_id ||= Dor::TextExtraction::SpeechToText.new(cocina_object:).job_id
end

# array of media files in the bucket folder for this job (excluding s3 folders)
def media
Dor::TextExtraction::SpeechToText.new(cocina_object:, workflow_context: workflow.context).filenames_to_stt
aws_provider.client.list_objects(bucket: aws_provider.bucket_name, prefix: job_id).contents.map(&:key).reject { |key| key.end_with?('/') }
end

# pulled from config, could later be overriden by settings in the workflow context
Expand Down
19 changes: 19 additions & 0 deletions spec/lib/dor/text_extraction/speech_to_text_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
let(:text_file) { build_file(true, true, 'file1.txt') }
let(:text_file2) { build_file(true, true, 'file2.txt') }
let(:druid) { 'druid:bc123df4567' }
let(:bare_druid) { 'bc123df4567' }

def build_file(sdr_preserve, shelve, filename)
extension = File.extname(filename)
Expand Down Expand Up @@ -110,4 +111,22 @@ def build_file(sdr_preserve, shelve, filename)
expect(stt.send(:stt_files)).to eq([m4a_file, mp4_file])
end
end

describe '#s3_location' do
let(:cocina_object) { instance_double(Cocina::Models::DRO, version:, externalIdentifier: druid, dro?: true, type: object_type) }
let(:version) { 3 }

it 'returns the s3 filename key for a given filename' do
expect(stt.s3_location('text.xml')).to eq("#{bare_druid}-v#{version}/text.xml")
end
end

describe '#job_id' do
let(:cocina_object) { instance_double(Cocina::Models::DRO, version:, externalIdentifier: druid, dro?: true, type: object_type) }
let(:version) { 3 }

it 'returns the job_id for the STT job' do
expect(stt.job_id).to eq("#{bare_druid}-v#{version}")
end
end
end
13 changes: 8 additions & 5 deletions spec/robots/dor_repo/speech_to_text/fetch_files_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
let(:bare_druid) { 'bb222cc3333' }
let(:robot) { described_class.new }
let(:file_fetcher) { instance_double(Dor::TextExtraction::FileFetcher, write_file_with_retries: written) }
let(:stt) { instance_double(Dor::TextExtraction::SpeechToText, filenames_to_stt: ['file1.mov', 'file2.mp3']) }
let(:stt) { instance_double(Dor::TextExtraction::SpeechToText, job_id:, filenames_to_stt: ['file1.mov', 'file2.mp3']) }
let(:cocina_model) { build(:dro, id: druid).new(structural: {}, type: object_type, access: { view: 'world' }) }
let(:object_type) { 'https://cocina.sul.stanford.edu/models/media' }
let(:dsa_object_client) do
Expand All @@ -22,17 +22,20 @@
instance_double(Dor::Workflow::Response::Process, lane_id: 'lane1', context: { 'runSpeechToText' => true })
end
let(:aws_client) { instance_double(Aws::S3::Client) }
let(:mov_location) { instance_double(Aws::S3::Object, bucket_name: Settings.aws.speech_to_text.base_s3_bucket, key: "#{bare_druid}/file1.mov", client: aws_client) }
let(:mp3_location) { instance_double(Aws::S3::Object, bucket_name: Settings.aws.speech_to_text.base_s3_bucket, key: "#{bare_druid}/file2.mp3", client: aws_client) }
let(:mov_location) { instance_double(Aws::S3::Object, bucket_name: Settings.aws.speech_to_text.base_s3_bucket, key: "#{job_id}/file1.mov", client: aws_client) }
let(:mp3_location) { instance_double(Aws::S3::Object, bucket_name: Settings.aws.speech_to_text.base_s3_bucket, key: "#{job_id}/file2.mp3", client: aws_client) }
let(:job_id) { "#{bare_druid}-v1" }

before do
allow(Dor::Services::Client).to receive(:object).and_return(dsa_object_client)
allow(LyberCore::WorkflowClientFactory).to receive(:build).and_return(workflow_client)
allow(Dor::TextExtraction::FileFetcher).to receive(:new).and_return(file_fetcher)
allow(Dor::TextExtraction::SpeechToText).to receive(:new).and_return(stt)
allow(Aws::S3::Client).to receive(:new).and_return(aws_client)
allow(Aws::S3::Object).to receive(:new).with(bucket_name: Settings.aws.speech_to_text.base_s3_bucket, key: "#{bare_druid}/file1.mov", client: aws_client).and_return(mov_location)
allow(Aws::S3::Object).to receive(:new).with(bucket_name: Settings.aws.speech_to_text.base_s3_bucket, key: "#{bare_druid}/file2.mp3", client: aws_client).and_return(mp3_location)
allow(Aws::S3::Object).to receive(:new).with(bucket_name: Settings.aws.speech_to_text.base_s3_bucket, key: "#{job_id}/file1.mov", client: aws_client).and_return(mov_location)
allow(Aws::S3::Object).to receive(:new).with(bucket_name: Settings.aws.speech_to_text.base_s3_bucket, key: "#{job_id}/file2.mp3", client: aws_client).and_return(mp3_location)
allow(stt).to receive(:s3_location).with('file1.mov').and_return("#{job_id}/file1.mov")
allow(stt).to receive(:s3_location).with('file2.mp3').and_return("#{job_id}/file2.mp3")
end

context 'when fetching files is successful' do
Expand Down
15 changes: 11 additions & 4 deletions spec/robots/dor_repo/speech_to_text/stt_create_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
subject(:perform) { test_perform(robot, druid) }

let(:druid) { 'druid:bb222cc3333' }
let(:bare_druid) { 'bb222cc3333' }
let(:robot) { described_class.new }
let(:aws_client) { instance_double(Aws::SQS::Client) }
let(:stt) { instance_double(Dor::TextExtraction::SpeechToText, filenames_to_stt: ['file1.mov', 'file2.mp3']) }
let(:aws_s3_client) { instance_double(Aws::S3::Client) }
let(:stt) { instance_double(Dor::TextExtraction::SpeechToText, job_id:, filenames_to_stt: ['file1.mov', 'file2.mp3']) }
let(:cocina_model) { build(:dro, id: druid).new(structural: {}, type: object_type, access: { view: 'world' }) }
let(:object_type) { 'https://cocina.sul.stanford.edu/models/media' }
let(:dsa_object_client) do
Expand All @@ -20,18 +22,23 @@
let(:workflow_process) do
instance_double(Dor::Workflow::Response::Process, lane_id: 'lane1', context: { 'runSpeechToText' => true })
end
let(:job_id) { '1234-5678-0000' }
let(:job_id) { "#{bare_druid}-v1" }
let(:media) { ["#{job_id}/file1.mov", "#{job_id}/file2.mp3"] }
let(:list_objects) { instance_double(Aws::S3::Types::ListObjectsOutput, contents: [mov_object, mp3_object]) }
let(:mov_object) { instance_double(Aws::S3::Types::Object, key: media[0]) }
let(:mp3_object) { instance_double(Aws::S3::Types::Object, key: media[1]) }

before do
allow(Aws::S3::Client).to receive(:new).and_return(aws_s3_client)
allow(Aws::SQS::Client).to receive(:new).and_return(aws_client)
allow(Dor::Services::Client).to receive(:object).and_return(dsa_object_client)
allow(Dor::TextExtraction::SpeechToText).to receive(:new).and_return(stt)
allow(LyberCore::WorkflowClientFactory).to receive(:build).and_return(workflow_client)
allow(SecureRandom).to receive(:uuid).and_return(job_id)
allow(aws_s3_client).to receive(:list_objects).and_return(list_objects)
end

context 'when the message is sent successfully' do
let(:message_body) { { id: job_id, druid:, media: ['file1.mov', 'file2.mp3'], options: { model: 'large', max_line_count: 80, beam_size: 10 } }.to_json }
let(:message_body) { { id: job_id, druid:, media:, options: { model: 'large', max_line_count: 80, beam_size: 10 } }.to_json }

before do
allow(aws_client).to receive(:send_message).with({ queue_url: Settings.aws.speech_to_text.sqs_todo_queue_url, message_body: }).and_return(true)
Expand Down