Skip to content

Commit

Permalink
Merge pull request #6 from Banou26/cancellable-2
Browse files Browse the repository at this point in the history
Cancellable operations
  • Loading branch information
Banou26 authored Feb 17, 2024
2 parents bf4f252 + 9af8894 commit 7e76d3a
Show file tree
Hide file tree
Showing 7 changed files with 246 additions and 163 deletions.
10 changes: 10 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"concurrently": "^7.6.0",
"copyfiles": "^2.4.1",
"nodemon": "^2.0.22",
"p-debounce": "^4.0.0",
"shx": "^0.3.4",
"typescript": "^4.9.4",
"vite": "^4.0.4"
Expand Down
97 changes: 39 additions & 58 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { toStreamChunkSize } from './utils'
import type { Resolvers as WorkerResolvers } from './worker'

import { call } from 'osra'
Expand All @@ -8,18 +9,9 @@ export type MakeTransmuxerOptions = {
/** Path that will be used to locate the javascript worker file */
workerUrl: string
workerOptions?: WorkerOptions
randomRead: (offset: number, size: number) => Promise<ArrayBuffer>
getStream: (offset: number) => Promise<ReadableStream<Uint8Array>>
getStream: (offset: number, size?: number) => Promise<ReadableStream<Uint8Array>>
subtitle: (title: string, language: string, data: string) => Promise<void> | void
attachment: (filename: string, mimetype: string, buffer: ArrayBuffer) => Promise<void> | void
write: (params: {
isHeader: boolean,
offset: number,
buffer: Uint8Array,
pos: number,
pts: number,
duration: number
}) => Promise<void> | void
length: number
bufferSize: number
}
Expand Down Expand Up @@ -94,11 +86,9 @@ export const makeTransmuxer = async ({
const subtitles = new Map<number, Subtitle>()

let currentStream: ReadableStream<Uint8Array> | undefined
let currentStreamOffset: number | undefined
let reader: ReadableStreamDefaultReader<Uint8Array> | undefined

let streamResultPromiseResolve: (value: { value: ArrayBuffer | undefined, done: boolean, cancelled: boolean }) => void
let streamResultPromiseReject: (reason?: any) => void
let streamResultPromise: Promise<{ value: ArrayBuffer | undefined, done: boolean, cancelled: boolean }>

const { init: workerInit, destroy: workerDestroy, read: workerRead, seek: workerSeek, getInfo: getInfo } =
await target(
Expand Down Expand Up @@ -148,58 +138,36 @@ export const makeTransmuxer = async ({
_subtitle(subtitle.title, subtitle.language, subtitleString)
},
attachment: async (filename, mimetype, buffer) => attachment(filename, mimetype, buffer),
randomRead: (offset, bufferSize) => _randomRead(offset, bufferSize),
randomRead: async (offset, bufferSize) => {
const stream = toStreamChunkSize(bufferSize)(await _getStream(offset, bufferSize))
const reader = stream.getReader()
const { value, done } = await reader.read()
reader.cancel()
return value?.buffer ?? new ArrayBuffer(0)
},
streamRead: async (offset: number) => {
if (!currentStream) {
if (
!currentStream ||
(currentStreamOffset && currentStreamOffset + bufferSize !== offset)
) {
reader?.cancel()

currentStream = await _getStream(offset)
reader = currentStream.getReader()
}

streamResultPromise = new Promise<{ value: ArrayBuffer | undefined, done: boolean, cancelled: boolean }>((resolve, reject) => {
streamResultPromiseResolve = resolve
streamResultPromiseReject = reject
})

const tryReading = (): Promise<void> | undefined =>
reader
?.read()
.then(result => ({
value: result.value?.buffer,
done: result.value === undefined,
cancelled: false
}))
.then(async (result) => {
if (result.done) {
reader?.cancel()
if (offset >= length) {
return streamResultPromiseResolve(result)
}
currentStream = await _getStream(offset)
reader = currentStream.getReader()
return tryReading()
}

return streamResultPromiseResolve(result)
})
.catch((err) => streamResultPromiseReject(err))

tryReading()

if (!reader) throw new Error('No reader found')

currentStreamOffset = offset

return (
streamResultPromise
.then((value) => ({
value: value.value,
done: value.done,
reader
.read()
.then(({ value, done }) => ({
buffer: value?.buffer,
done,
cancelled: false
}))
.catch(err => {
console.error(err)
return {
value: undefined,
done: false,
cancelled: true
}
})
)
},
clearStream: async () => {
Expand Down Expand Up @@ -234,8 +202,21 @@ export const makeTransmuxer = async ({
}
return workerDestroy()
},
read: () => workerRead(),
read: () =>{
// console.log('read')
return workerRead()
},
seek: (time: number) => workerSeek(Math.max(0, time) * 1000),
// seek: (time: number) => {
// console.log('seek', streamResultPromiseReject)
// if (streamResultPromiseReject) {
// console.log('cancel seek')
// streamResultPromiseReject(new Error('Seeking to a new position.'))
// reader?.cancel()
// console.log('cancelled seek')
// }
// return workerSeek(Math.max(0, time) * 1000)
// },
getInfo: () => getInfo() as Promise<{ input: MediaInfo, output: MediaInfo }>

Check failure on line 220 in src/index.ts

View workflow job for this annotation

GitHub Actions / build

Conversion of type '{ input: { formatName: string; mimeType: string; duration: number; video_mime_type: string; audio_mime_type: string; }; output: { formatName: string; mimeType: string; duration: number; video_mime_type: string; audio_mime_type: string; }; }' to type 'Promise<{ input: MediaInfo; output: MediaInfo; }>' may be a mistake because neither type sufficiently overlaps with the other. If this was intentional, convert the expression to 'unknown' first.

Check failure on line 220 in src/index.ts

View workflow job for this annotation

GitHub Actions / build

Conversion of type '{ input: { formatName: string; mimeType: string; duration: number; video_mime_type: string; audio_mime_type: string; }; output: { formatName: string; mimeType: string; duration: number; video_mime_type: string; audio_mime_type: string; }; }' to type 'Promise<{ input: MediaInfo; output: MediaInfo; }>' may be a mistake because neither type sufficiently overlaps with the other. If this was intentional, convert the expression to 'unknown' first.
}

Expand Down
55 changes: 47 additions & 8 deletions src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ extern "C" {
double pts;
double duration;
bool first_frame = true;
bool cancelling = false;

std::string video_mime_type;
std::string audio_mime_type;
Expand All @@ -83,6 +84,7 @@ extern "C" {
val write = val::undefined();
val flush = val::undefined();
val error = val::undefined();
val exit = val::undefined();

double currentOffset = 0;

Expand Down Expand Up @@ -122,6 +124,7 @@ extern "C" {
write = options["write"];
flush = options["flush"];
error = options["error"];
exit = options["exit"];
}

auto decimalToHex(int d, int padding) {
Expand Down Expand Up @@ -471,6 +474,9 @@ extern "C" {
AVPacket* packet = av_packet_alloc();

if ((res = av_read_frame(input_format_context, packet)) < 0) {
// free packet
av_packet_unref(packet);
av_packet_free(&packet);
if (res == AVERROR_EOF) {
avio_flush(output_format_context->pb);
is_flushing = true;
Expand All @@ -483,6 +489,11 @@ extern "C" {
);
// destroy();
break;
} else if (res == AVERROR_EXIT) {
cancelling = false;
// printf("ERROR: could not read frame, exit requested | %s \n", av_err2str(res));
exit();
break;
}
printf("ERROR: could not read frame | %s \n", av_err2str(res));
break;
Expand Down Expand Up @@ -635,10 +646,14 @@ extern "C" {
pts = 0;
pos = 0;
if ((res = av_seek_frame(input_format_context, video_stream_index, timestamp, AVSEEK_FLAG_BACKWARD)) < 0) {
seeking = false;
first_seek = false;
printf("ERROR: could not seek frame | %s \n", av_err2str(res));
return 1;
}
seeking = false;
first_seek = false;
cancelling = false;
return 0;
}

Expand Down Expand Up @@ -691,16 +706,30 @@ extern "C" {
Remuxer &remuxObject = *reinterpret_cast<Remuxer*>(opaque);
std::string buffer;

if (remuxObject.cancelling) {
remuxObject.promise.await();
return AVERROR_EXIT;
}

if (remuxObject.initializing) {
emscripten::val &randomRead = remuxObject.randomRead;
if (remuxObject.first_init) {
buffer =
emscripten::val result =
randomRead(
to_string(remuxObject.input_format_context->pb->pos),
buf_size
)
.await()
.as<std::string>();
.await();
bool is_cancelled = result["cancelled"].as<bool>();
remuxObject.cancelling = is_cancelled;
if (is_cancelled) {
return AVERROR_EXIT;
}
bool is_done = result["done"].as<bool>();
if (is_done) {
return AVERROR_EOF;
}
buffer = result["buffer"].as<std::string>();
remuxObject.init_buffers.push_back(buffer);
} else {
remuxObject.promise.await();
Expand All @@ -710,13 +739,22 @@ extern "C" {
} else if(remuxObject.seeking) {
emscripten::val &randomRead = remuxObject.randomRead;
if (remuxObject.first_seek) {
buffer =
emscripten::val result =
randomRead(
to_string(remuxObject.input_format_context->pb->pos),
buf_size
)
.await()
.as<std::string>();
.await();
bool is_cancelled = result["cancelled"].as<bool>();
remuxObject.cancelling = is_cancelled;
if (is_cancelled) {
return AVERROR_EXIT;
}
bool is_done = result["done"].as<bool>();
if (is_done) {
return AVERROR_EOF;
}
buffer = result["buffer"].as<std::string>();
remuxObject.seek_buffers.push_back(buffer);
} else {
remuxObject.promise.await();
Expand All @@ -732,14 +770,15 @@ extern "C" {
)
.await();
bool is_cancelled = result["cancelled"].as<bool>();
remuxObject.cancelling = is_cancelled;
if (is_cancelled) {
return AVERROR_EXIT;
}
bool is_done = result["done"].as<bool>();
if (is_done) {
return AVERROR_EOF;
}
buffer = result["value"].as<std::string>();
buffer = result["buffer"].as<std::string>();
}

int buffer_size = buffer.size();
Expand Down Expand Up @@ -772,7 +811,7 @@ extern "C" {
buf
)
)
).await();
);

return buf_size;
}
Expand Down
Loading

0 comments on commit 7e76d3a

Please sign in to comment.