Skip to content

Commit

Permalink
feat: Race top n nodes when making requests (#25)
Browse files Browse the repository at this point in the history
* feat: implement a nodes list for clients

* feat: implement a storage interface using indexedDb

* feat: implement a test suite for fallback

* fix: remove unused code

* fix: eslint an jsdoc

* fix: formatting and consistency

* fix: indexDbCheck

* chore: change storage implementation

* enhancement: simplify node loading

* naive fallback implementation

* modify fallback

* fix formatting and typos

* typos

* Update .eslintrc

Co-authored-by: Diego Rodríguez Baquero <[email protected]>

* enhancement: edit storage impl

* enhancement: deal with overlapping byte chunks

* feat: add fallback test suite

* fix: tests running

* cleanup content fetch with fallback

* add initial origin fetch to fallback

* formatting and file re-org

* feat: merge main into fallback branch (#22)

* Abort on error (#19)

* feat: use controller from options if exists. abort fetch if error occurs.

* test: check if external abort controller is used

* build: move build output to dist/ folder

* fix: newline

* 0.1.1

* Build exports (#20)

* chore: rename file

* feat: add new entrypoint with exports.

Switch Saturn to named export

* build: expose entire module instead of just the default export

* docs: update README

* 0.2.0

* feat: include worker scopes when checking for browser runtime (#21)

* 0.3.0

---------

Co-authored-by: Eric Guan <[email protected]>

* load nodes on first success

* add fallback limit

* fix: fallback bug

* put eslint settings in package.json

* add nodesListKey as static

* fix: resolve process in browser

* feat: add fetching with a race

* enhancement: add backward compatibility for racing

* tests and cleanup

* fixes and enhancements

* add typings

* add typings

---------

Co-authored-by: Diego Rodríguez Baquero <[email protected]>
Co-authored-by: Eric Guan <[email protected]>
  • Loading branch information
3 people authored Oct 26, 2023
1 parent 5ef32b0 commit 3b4e8db
Show file tree
Hide file tree
Showing 4 changed files with 235 additions and 23 deletions.
154 changes: 140 additions & 14 deletions src/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,13 @@ import { parseUrl, addHttpPrefix } from './utils/url.js'
import { isBrowserContext } from './utils/runtime.js'

const MAX_NODE_WEIGHT = 100
/**
* @typedef {import('./types.js').Node} Node
*/

export class Saturn {
static nodesListKey = 'saturn-nodes'
static defaultRaceCount = 3
/**
*
* @param {object} [opts={}]
Expand Down Expand Up @@ -53,6 +57,93 @@ export class Saturn {
this.loadNodesPromise = this._loadNodes(this.opts)
}

/**
*
* @param {string} cidPath
* @param {object} [opts={}]
* @param {('car'|'raw')} [opts.format]
* @param {number} [opts.connectTimeout=5000]
* @param {number} [opts.downloadTimeout=0]
* @returns {Promise<object>}
*/
async fetchCIDWithRace (cidPath, opts = {}) {
const [cid] = (cidPath ?? '').split('/')
CID.parse(cid)

const jwt = await getJWT(this.opts, this.storage)

const options = Object.assign({}, this.opts, { format: 'car', jwt }, opts)

if (!isBrowserContext) {
options.headers = {
...(options.headers || {}),
Authorization: 'Bearer ' + options.jwt
}
}

const origins = options.origins
const controllers = []

const createFetchPromise = async (origin) => {
const fetchOptions = { ...options, url: origin }
const url = this.createRequestURL(cidPath, fetchOptions)

const controller = new AbortController()
controllers.push(controller)
const connectTimeout = setTimeout(() => {
controller.abort()
}, options.connectTimeout)

try {
res = await fetch(parseUrl(url), { signal: controller.signal, ...options })
clearTimeout(connectTimeout)
return { res, url, controller }
} catch (err) {
throw new Error(
`Non OK response received: ${res.status} ${res.statusText}`
)
}
}

const abortRemainingFetches = async (successController, controllers) => {
return controllers.forEach((controller) => {
if (successController !== controller) {
controller.abort('Request race unsuccessful')
}
})
}

const fetchPromises = Promise.any(origins.map((origin) => createFetchPromise(origin)))

let log = {
startTime: new Date()
}

let res, url, controller
try {
({ res, url, controller } = await fetchPromises)

abortRemainingFetches(controller, controllers)
log = Object.assign(log, this._generateLog(res, log), { url })

if (!res.ok) {
throw new Error(
`Non OK response received: ${res.status} ${res.statusText}`
)
}
} catch (err) {
if (!res) {
log.error = err.message
}
// Report now if error, otherwise report after download is done.
this._finalizeLog(log)

throw err
}

return { res, controller, log }
}

/**
*
* @param {string} cidPath
Expand All @@ -70,8 +161,7 @@ export class Saturn {

const options = Object.assign({}, this.opts, { format: 'car', jwt }, opts)
const url = this.createRequestURL(cidPath, options)

const log = {
let log = {
url,
startTime: new Date()
}
Expand All @@ -93,13 +183,7 @@ export class Saturn {

clearTimeout(connectTimeout)

const { headers } = res
log.ttfbMs = new Date() - log.startTime
log.httpStatusCode = res.status
log.cacheHit = headers.get('saturn-cache-status') === 'HIT'
log.nodeId = headers.get('saturn-node-id')
log.requestId = headers.get('saturn-transfer-id')
log.httpProtocol = headers.get('quic-status')
log = Object.assign(log, this._generateLog(res, log))

if (!res.ok) {
throw new Error(
Expand All @@ -119,11 +203,32 @@ export class Saturn {
return { res, controller, log }
}

/**
* @param {Response} res
* @param {object} log
* @returns {object}
*/
_generateLog (res, log) {
const { headers } = res
log.httpStatusCode = res.status
log.cacheHit = headers.get('saturn-cache-status') === 'HIT'
log.nodeId = headers.get('saturn-node-id')
log.requestId = headers.get('saturn-transfer-id')
log.httpProtocol = headers.get('quic-status')

if (res.ok) {
log.ttfbMs = new Date() - log.startTime
}

return log
}

/**
*
* @param {string} cidPath
* @param {object} [opts={}]
* @param {('car'|'raw')} [opts.format]
* @param {boolean} [opts.raceNodes]
* @param {string} [opts.url]
* @param {number} [opts.connectTimeout=5000]
* @param {number} [opts.downloadTimeout=0]
Expand Down Expand Up @@ -168,11 +273,18 @@ export class Saturn {
}

let fallbackCount = 0
for (const origin of this.nodes) {
const nodes = this.nodes
for (let i = 0; i < nodes.length; i++) {
if (fallbackCount > this.opts.fallbackLimit) {
return
}
opts.url = origin.url
if (opts.raceNodes) {
const origins = nodes.slice(i, i + Saturn.defaultRaceCount).map((node) => node.url)
opts.origins = origins
} else {
opts.url = nodes[i].url
}

try {
yield * fetchContent()
return
Expand All @@ -191,13 +303,20 @@ export class Saturn {
*
* @param {string} cidPath
* @param {object} [opts={}]
* @param {('car'|'raw')} [opts.format]
* @param {('car'|'raw')} [opts.format]- -
* @param {boolean} [opts.raceNodes]- -
* @param {number} [opts.connectTimeout=5000]
* @param {number} [opts.downloadTimeout=0]
* @returns {Promise<AsyncIterable<Uint8Array>>}
*/
async * fetchContent (cidPath, opts = {}) {
const { res, controller, log } = await this.fetchCID(cidPath, opts)
let res, controller, log

if (opts.raceNodes) {
({ res, controller, log } = await this.fetchCIDWithRace(cidPath, opts))
} else {
({ res, controller, log } = await this.fetchCID(cidPath, opts))
}

async function * metricsIterable (itr) {
log.numBytesSent = 0
Expand Down Expand Up @@ -226,6 +345,7 @@ export class Saturn {
* @param {string} cidPath
* @param {object} [opts={}]
* @param {('car'|'raw')} [opts.format]
* @param {boolean} [opts.raceNodes]
* @param {number} [opts.connectTimeout=5000]
* @param {number} [opts.downloadTimeout=0]
* @returns {Promise<Uint8Array>}
Expand All @@ -241,7 +361,7 @@ export class Saturn {
* @returns {URL}
*/
createRequestURL (cidPath, opts) {
let origin = opts.url || opts.cdnURL
let origin = opts.url || (opts.origins && opts.origins[0]) || opts.cdnURL
origin = addHttpPrefix(origin)
const url = new URL(`${origin}/ipfs/${cidPath}`)

Expand Down Expand Up @@ -371,6 +491,12 @@ export class Saturn {
}
}

/**
* Sorts nodes based on normalized distance and weights. Distance is prioritized for sorting.
*
* @param {Node[]} nodes
* @returns {Node[]}
*/
_sortNodes (nodes) {
// Determine the maximum distance for normalization
const maxDistance = Math.max(...nodes.map(node => node.distance))
Expand Down
14 changes: 14 additions & 0 deletions src/types.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@

/**
* @module types */

/**
*
* @typedef {object} Node
* @property {string} ip
* @property {number} weight
* @property {number} distance
* @property {string} url
*/

export {}
69 changes: 68 additions & 1 deletion test/fallback.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,74 @@ describe('Client Fallback', () => {

const saturn = new Saturn({ storage: mockStorage, clientKey: CLIENT_KEY, clientId: 'test' })

const cid = saturn.fetchContentWithFallback('bafkreifjjcie6lypi6ny7amxnfftagclbuxndqonfipmb64f2km2devei4', { url: 'node1.saturn.ms' })
const cid = saturn.fetchContentWithFallback('bafkreifjjcie6lypi6ny7amxnfftagclbuxndqonfipmb64f2km2devei4')

const buffer = await concatChunks(cid)
const actualContent = String.fromCharCode(...buffer)
const expectedContent = 'hello world\n'

assert.strictEqual(actualContent, expectedContent)
server.close()
mock.reset()
})

test('Content Fallback fetches a cid properly with race', async (t) => {
const handlers = [
mockOrchHandler(5, TEST_DEFAULT_ORCH, 'saturn.ms'),
mockJWT(TEST_AUTH),
mockSaturnOriginHandler(TEST_ORIGIN_DOMAIN, 0, true),
...mockNodesHandlers(5, TEST_ORIGIN_DOMAIN)
]
const server = getMockServer(handlers)
server.listen(MSW_SERVER_OPTS)

const expectedNodes = generateNodes(3, TEST_ORIGIN_DOMAIN)

// Mocking storage object
const mockStorage = {
get: async (key) => expectedNodes,
set: async (key, value) => { return null }
}
t.mock.method(mockStorage, 'get')
t.mock.method(mockStorage, 'set')

const saturn = new Saturn({ storage: mockStorage, clientKey: CLIENT_KEY, clientId: 'test' })
// const origins =

const cid = saturn.fetchContentWithFallback('bafkreifjjcie6lypi6ny7amxnfftagclbuxndqonfipmb64f2km2devei4', { raceNodes: true })

const buffer = await concatChunks(cid)
const actualContent = String.fromCharCode(...buffer)
const expectedContent = 'hello world\n'

assert.strictEqual(actualContent, expectedContent)
server.close()
mock.reset()
})

test('Content Fallback with race fetches from consecutive nodes on failure', async (t) => {
const handlers = [
mockOrchHandler(5, TEST_DEFAULT_ORCH, 'saturn.ms'),
mockJWT(TEST_AUTH),
mockSaturnOriginHandler(TEST_ORIGIN_DOMAIN, 0, true),
...mockNodesHandlers(5, TEST_ORIGIN_DOMAIN, 2)
]
const server = getMockServer(handlers)
server.listen(MSW_SERVER_OPTS)

const expectedNodes = generateNodes(5, TEST_ORIGIN_DOMAIN)

// Mocking storage object
const mockStorage = {
get: async (key) => expectedNodes,
set: async (key, value) => { return null }
}
t.mock.method(mockStorage, 'get')
t.mock.method(mockStorage, 'set')

const saturn = new Saturn({ storage: mockStorage, clientKey: CLIENT_KEY, clientId: 'test' })

const cid = saturn.fetchContentWithFallback('bafkreifjjcie6lypi6ny7amxnfftagclbuxndqonfipmb64f2km2devei4', { raceNodes: true })

const buffer = await concatChunks(cid)
const actualContent = String.fromCharCode(...buffer)
Expand Down
21 changes: 13 additions & 8 deletions test/test-utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,13 @@ import fs from 'fs'
import { addHttpPrefix } from '../src/utils/url.js'

const HTTP_STATUS_OK = 200
const HTTP_STATUS_TIMEOUT = 504

const __dirname = dirname(fileURLToPath(import.meta.url))
process.env.TESTING = 'true'

/**
*
* @typedef {object} Node
* @property {string} ip
* @property {number} weight
* @property {number} distance
* @property {string} url
* @typedef {import('../src/types.js').Node} Node
*/

/**
Expand Down Expand Up @@ -123,14 +119,23 @@ export function mockJWT (authURL) {
*
* @param {number} count - amount of nodes to mock
* @param {string} originDomain - saturn origin domain.
* @param {number} failures
* @returns {RestHandler<any>[]}
*/
export function mockNodesHandlers (count, originDomain) {
export function mockNodesHandlers (count, originDomain, failures = 0) {
if (failures > count) {
throw Error('failures number cannot exceed node count')
}
const nodes = generateNodes(count, originDomain)

const handlers = nodes.map((node) => {
const handlers = nodes.map((node, idx) => {
const url = `${node.url}/ipfs/:cid`
return rest.get(url, (req, res, ctx) => {
if (idx < failures) {
return res(
ctx.status(HTTP_STATUS_TIMEOUT)
)
}
const filepath = getFixturePath('hello.car')
const fileContents = fs.readFileSync(filepath)
return res(
Expand Down

0 comments on commit 3b4e8db

Please sign in to comment.