Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cache): use vary headers to compare cached response with request headers #3

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 126 additions & 26 deletions lib/interceptor/cache.js
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
import assert from 'node:assert'
import { LRUCache } from 'lru-cache'
import { DecoratorHandler, parseHeaders, parseCacheControl } from '../utils.js'
import { DatabaseSync } from 'node:sqlite' // --experimental-sqlite

class CacheHandler extends DecoratorHandler {
#handler
#store
#key
#opts
#value = null

constructor({ key, handler, store }) {
constructor({ key, handler, store, opts = [] }) {
super(handler)

this.#key = key
this.#handler = handler
this.#store = store
this.#opts = opts
}

onConnect(abort) {
Expand Down Expand Up @@ -64,11 +66,10 @@ class CacheHandler extends DecoratorHandler {
(rawHeaders?.reduce((xs, x) => xs + x.length, 0) ?? 0) +
(statusMessage?.length ?? 0) +
64,
ttl: ttl * 1e3,
ttl, // in ms!
}
}
}

return this.#handler.onHeaders(statusCode, rawHeaders, resume, statusMessage, headers)
}

Expand All @@ -88,37 +89,123 @@ class CacheHandler extends DecoratorHandler {

onComplete(rawTrailers) {
if (this.#value) {
const resHeaders = parseHeaders(this.#value.data.rawHeaders)
const reqHeaders = this.#opts

this.#value.data.rawTrailers = rawTrailers
this.#value.size += rawTrailers?.reduce((xs, x) => xs + x.length, 0) ?? 0
this.#store.set(this.#key, this.#value.data, { ttl: this.#value.ttl, size: this.#value.size })
this.#value.size = this.#value.size
? this.#value.size + rawTrailers?.reduce((xs, x) => xs + x.length, 0)
: 0
this.#value.vary = formatVaryData(resHeaders, reqHeaders)
IsakT marked this conversation as resolved.
Show resolved Hide resolved

this.#store.set(this.#key, this.#value)
}
return this.#handler.onComplete(rawTrailers)
}
}

// TODO (fix): Async filesystem cache.
function formatVaryData(resHeaders, reqHeaders) {
return resHeaders.vary
?.split(',')
.map((key) => key.trim().toLowerCase())
.map((key) => [key, reqHeaders[key]])
.filter(([_key, val]) => val)
}

// Can we move this class somewhere else, to the util.js file or its own module?
class CacheStore {
constructor({ maxSize = 1024 * 1024, maxEntrySize = 128 * 1024 }) {
this.maxSize = maxSize
this.maxEntrySize = maxEntrySize
this.cache = new LRUCache({ maxSize })
constructor() {
this.database = null
this.init()
}

init() {
this.database = new DatabaseSync('file:memdb1?mode=memory&cache=shared')

this.database.exec(`
CREATE TABLE IF NOT EXISTS cacheInterceptor(
key TEXT,
data TEXT,
vary TEXT,
size INTEGER,
ttl INTEGER,
insertTime INTEGER
IsakT marked this conversation as resolved.
Show resolved Hide resolved
) STRICT
`)
}

set(key, value, opts) {
this.cache.set(key, value, opts)
set(key, entry, opts) {
if (!this.database) {
throw new Error('Database not initialized')
}

entry.data = JSON.stringify(entry.data)
entry.vary = JSON.stringify(entry.vary)

const insert = this.database.prepare(
'INSERT INTO cacheInterceptor (key, data, vary, size, ttl, insertTime) VALUES (?, ?, ?, ?, ?, ?)',
)

insert.run(key, entry.data, entry.vary, entry.size, entry.ttl, Date.now())

this.purge()
}

get(key) {
return this.cache.get(key)
if (!this.database) {
throw new Error('Database not initialized')
}
this.purge()
const query = this.database.prepare('SELECT * FROM cacheInterceptor WHERE key = ?')
const rows = query.all(key)
rows.map((i) => {
i.data = JSON.parse(i.data)
i.vary = JSON.parse(i.vary)
return i
})

// Just in case purge hasn't finished
const nonExpiredRows = rows.filter((i) => i.insertTime + i.ttl > Date.now())

return nonExpiredRows
}

purge() {
const query = this.database.prepare('DELETE FROM cacheInterceptor WHERE insertTime + ttl < ?')
query.run(Date.now())
}

deleteAll() {
const query = this.database.prepare('DELETE FROM cacheInterceptor')
IsakT marked this conversation as resolved.
Show resolved Hide resolved
query.run()
}
}

function makeKey(opts) {
// NOTE: Ignores headers...
return `${opts.origin}:${opts.method}:${opts.path}`
/*
Sort entries by number of vary headers in descending order, because
we need to compare the most complex response to the request first.
A cached response with an empty ´vary´ field will otherwise win every time.
*/
function sortEntriesByVary(entries) {
entries.sort((a, b) => {
const lengthA = a.vary ? a.vary.length : 0
const lengthB = b.vary ? b.vary.length : 0
return lengthB - lengthA
})
}
ronag marked this conversation as resolved.
Show resolved Hide resolved

const DEFAULT_CACHE_STORE = new CacheStore({ maxSize: 128 * 1024, maxEntrySize: 1024 })
function findEntryByHeaders(entries, reqHeaders) {
sortEntriesByVary(entries)

return entries?.find(
(entry) =>
entry.vary?.every(([key, val]) => {
return reqHeaders?.headers[key] === val
}) ?? true,
ronag marked this conversation as resolved.
Show resolved Hide resolved
)
}

const DEFAULT_CACHE_STORE = new CacheStore()

export default (opts) => (dispatch) => (opts, handler) => {
if (!opts.cache || opts.upgrade) {
Expand Down Expand Up @@ -149,22 +236,32 @@ export default (opts) => (dispatch) => (opts, handler) => {
// Dump body...
opts.body?.on('error', () => {}).resume()

opts.host = opts.host ?? new URL(opts.origin).host

if (!opts.headers) {
opts.headers = {}
}

// idea: use DEFAULT_CACHE_STORE by default if 'cache' not specified, since the cache interceptor was already specified to be used.
const store = opts.cache === true ? DEFAULT_CACHE_STORE : opts.cache

if (!store) {
throw new Error(`Cache store not provided.`)
}

let key = makeKey(opts)
let value = store.get(key)
let key = `${opts.method}:${opts.path}`

if (value == null && opts.method === 'HEAD') {
key = makeKey({ ...opts, method: 'GET' })
value = store.get(key)
let entries = store.get(key)
IsakT marked this conversation as resolved.
Show resolved Hide resolved

if (Array.isArray(entries) && entries.length === 0 && opts.method === 'HEAD') {
key = `GET:${opts.path}`
entries = store.get(key)
}

if (value) {
const { statusCode, statusMessage, rawHeaders, rawTrailers, body } = value
const entry = findEntryByHeaders(entries, opts)

if (entry) {
const { statusCode, statusMessage, rawHeaders, rawTrailers, body } = entry.data
const ac = new AbortController()
const signal = ac.signal

Expand All @@ -176,11 +273,14 @@ export default (opts) => (dispatch) => (opts, handler) => {
try {
handler.onConnect(abort)
signal.throwIfAborted()

handler.onHeaders(statusCode, rawHeaders, resume, statusMessage)
signal.throwIfAborted()

if (opts.method !== 'HEAD') {
for (const chunk of body) {
const ret = handler.onData(chunk)

signal.throwIfAborted()
if (ret === false) {
// TODO (fix): back pressure...
Expand All @@ -196,6 +296,6 @@ export default (opts) => (dispatch) => (opts, handler) => {

return true
} else {
return dispatch(opts, new CacheHandler({ handler, store, key: makeKey(opts) }))
return dispatch(opts, new CacheHandler({ handler, store, key, opts }))
}
}
3 changes: 3 additions & 0 deletions lib/interceptor/proxy.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class Handler extends DecoratorHandler {
}

onUpgrade(statusCode, rawHeaders, socket) {
console.log('Proxy onUpgrade')
return this.#handler.onUpgrade(
statusCode,
reduceHeaders(
Expand All @@ -34,6 +35,7 @@ class Handler extends DecoratorHandler {
}

onHeaders(statusCode, rawHeaders, resume, statusMessage) {
console.log('Proxy onHeaders')
return this.#handler.onHeaders(
statusCode,
reduceHeaders(
Expand Down Expand Up @@ -164,6 +166,7 @@ function printIp(address, port) {
}

export default (opts) => (dispatch) => (opts, handler) => {
console.log('Proxy default dispatch')
if (!opts.proxy) {
return dispatch(opts, handler)
}
Expand Down
3 changes: 3 additions & 0 deletions lib/interceptor/redirect.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class Handler extends DecoratorHandler {
}

onConnect(abort) {
console.log('Redirect onConnect')
if (this.#aborted) {
abort(this.#reason)
} else {
Expand All @@ -48,6 +49,7 @@ class Handler extends DecoratorHandler {
}

onHeaders(statusCode, rawHeaders, resume, statusText, headers = parseHeaders(rawHeaders)) {
console.log('Redirect onHeaders')
if (redirectableStatusCodes.indexOf(statusCode) === -1) {
assert(!this.#headersSent)
this.#headersSent = true
Expand Down Expand Up @@ -109,6 +111,7 @@ class Handler extends DecoratorHandler {
}

onData(chunk) {
console.log('Redirect onData')
if (this.#location) {
/*
https://tools.ietf.org/html/rfc7231#section-6.4
Expand Down
1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
"cache-control-parser": "^2.0.6",
"cacheable-lookup": "^7.0.0",
"http-errors": "^2.0.0",
"lru-cache": "^11.0.0",
"undici": "^6.19.5"
},
"devDependencies": {
Expand Down
Loading