Skip to content

Commit

Permalink
enhancement: error handling + exp flag (#31)
Browse files Browse the repository at this point in the history
* enhancement: error handling + exp flag

* fix formatting

* url handling

* fix url issue
  • Loading branch information
AmeanAsad authored Oct 27, 2023
1 parent 3b4e8db commit 8b532ec
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 28 deletions.
42 changes: 30 additions & 12 deletions src/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { memoryStorage } from './storage/index.js'
import { getJWT } from './utils/jwt.js'
import { parseUrl, addHttpPrefix } from './utils/url.js'
import { isBrowserContext } from './utils/runtime.js'
import { isErrorUnavoidable } from './utils/errors.js'

const MAX_NODE_WEIGHT = 100
/**
Expand All @@ -28,6 +29,7 @@ export class Saturn {
* @param {number} [opts.downloadTimeout=0]
* @param {string} [opts.orchURL]
* @param {number} [opts.fallbackLimit]
* @param {boolean} [opts.experimental]
* @param {import('./storage/index.js').Storage} [opts.storage]
*/
constructor (opts = {}) {
Expand All @@ -54,7 +56,7 @@ export class Saturn {
this._monitorPerformanceBuffer()
}
this.storage = this.opts.storage || memoryStorage()
this.loadNodesPromise = this._loadNodes(this.opts)
this.loadNodesPromise = this.opts.experimental ? this._loadNodes(this.opts) : null
}

/**
Expand All @@ -81,7 +83,11 @@ export class Saturn {
}
}

const origins = options.origins
let origins = options.origins
if (!origins || origins.length === 0) {
const replacementUrl = options.url ?? options.cdnURL
origins = [replacementUrl]
}
const controllers = []

const createFetchPromise = async (origin) => {
Expand Down Expand Up @@ -125,11 +131,12 @@ export class Saturn {

abortRemainingFetches(controller, controllers)
log = Object.assign(log, this._generateLog(res, log), { url })

if (!res.ok) {
throw new Error(
`Non OK response received: ${res.status} ${res.statusText}`
const error = new Error(
`Non OK response received: ${res.status} ${res.statusText}`
)
error.res = res
throw error
}
} catch (err) {
if (!res) {
Expand Down Expand Up @@ -184,11 +191,12 @@ export class Saturn {
clearTimeout(connectTimeout)

log = Object.assign(log, this._generateLog(res, log))

if (!res.ok) {
throw new Error(
const error = new Error(
`Non OK response received: ${res.status} ${res.statusText}`
)
error.res = res
throw error
}
} catch (err) {
if (!res) {
Expand Down Expand Up @@ -240,6 +248,10 @@ export class Saturn {
// this is temporary until range requests are supported.
let byteCountCheckpoint = 0

const throwError = () => {
throw new Error(`All attempts to fetch content have failed. Last error: ${lastError.message}`)
}

const fetchContent = async function * () {
let byteCount = 0
const byteChunks = await this.fetchContent(cidPath, opts)
Expand Down Expand Up @@ -268,6 +280,9 @@ export class Saturn {
return
} catch (err) {
lastError = err
if (err.res?.status === 410 || isErrorUnavoidable(err)) {
throwError()
}
await this.loadNodesPromise
}
}
Expand All @@ -290,21 +305,24 @@ export class Saturn {
return
} catch (err) {
lastError = err
if (err.res?.status === 410 || isErrorUnavoidable(err)) {
break
}
}
fallbackCount += 1
}

if (lastError) {
throw new Error(`All attempts to fetch content have failed. Last error: ${lastError.message}`)
throwError()
}
}

/**
*
* @param {string} cidPath
* @param {object} [opts={}]
* @param {('car'|'raw')} [opts.format]- -
* @param {boolean} [opts.raceNodes]- -
* @param {('car'|'raw')} [opts.format]
* @param {boolean} [opts.raceNodes]
* @param {number} [opts.connectTimeout=5000]
* @param {number} [opts.downloadTimeout=0]
* @returns {Promise<AsyncIterable<Uint8Array>>}
Expand Down Expand Up @@ -391,8 +409,8 @@ export class Saturn {
* @param {object} log
*/
reportLogs (log) {
if (!this.reportingLogs) return
this.logs.push(log)
if (!this.reportingLogs) return
this.reportLogsTimeout && clearTimeout(this.reportLogsTimeout)
this.reportLogsTimeout = setTimeout(this._reportLogs.bind(this), 3_000)
}
Expand Down Expand Up @@ -560,6 +578,6 @@ export class Saturn {
nodes = await orchNodesListPromise
nodes = this._sortNodes(nodes)
this.nodes = nodes
this.storage?.set(Saturn.nodesListKey, nodes)
this.storage.set(Saturn.nodesListKey, nodes)
}
}
19 changes: 19 additions & 0 deletions src/utils/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,22 @@ export class TimeoutError extends Error {
this.name = 'TimeoutError'
}
}

export function isErrorUnavoidable (error) {
if (!error || typeof error.message !== 'string') return false

const errorPatterns = [
/file does not exist/,
/Cannot read properties of undefined \(reading '([^']+)'\)/,
/([a-zA-Z_.]+) is undefined/,
/undefined is not an object \(evaluating '([^']+)'\)/
]

for (const pattern of errorPatterns) {
if (pattern.test(error.message)) {
return true
}
}

return false
}
89 changes: 78 additions & 11 deletions test/fallback.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@ import assert from 'node:assert/strict'
import { describe, mock, test } from 'node:test'

import { Saturn } from '#src/index.js'
import { concatChunks, generateNodes, getMockServer, mockJWT, mockNodesHandlers, mockOrchHandler, mockSaturnOriginHandler, MSW_SERVER_OPTS } from './test-utils.js'
import { concatChunks, generateNodes, getMockServer, HTTP_STATUS_GONE, mockJWT, mockNodesHandlers, mockOrchHandler, mockSaturnOriginHandler, MSW_SERVER_OPTS } from './test-utils.js'

const TEST_DEFAULT_ORCH = 'https://orchestrator.strn.pl/nodes'
const TEST_NODES_LIST_KEY = 'saturn-nodes'
const TEST_AUTH = 'https://fz3dyeyxmebszwhuiky7vggmsu0rlkoy.lambda-url.us-west-2.on.aws/'
const TEST_ORIGIN_DOMAIN = 'saturn.ms'
const CLIENT_KEY = 'key'

const experimental = true

describe('Client Fallback', () => {
test('Nodes are loaded from the orchestrator if no storage is passed', async (t) => {
const handlers = [
Expand All @@ -21,7 +24,7 @@ describe('Client Fallback', () => {
const expectedNodes = generateNodes(2, TEST_ORIGIN_DOMAIN)

// No Storage is injected
const saturn = new Saturn({ clientKey: CLIENT_KEY })
const saturn = new Saturn({ clientKey: CLIENT_KEY, experimental })
const mockOpts = { orchURL: TEST_DEFAULT_ORCH }

await saturn._loadNodes(mockOpts)
Expand Down Expand Up @@ -50,7 +53,7 @@ describe('Client Fallback', () => {
t.mock.method(mockStorage, 'get')
t.mock.method(mockStorage, 'set')

const saturn = new Saturn({ storage: mockStorage, clientKey: CLIENT_KEY })
const saturn = new Saturn({ storage: mockStorage, clientKey: CLIENT_KEY, experimental })

// Mocking options
const mockOpts = { orchURL: TEST_DEFAULT_ORCH }
Expand Down Expand Up @@ -87,7 +90,7 @@ describe('Client Fallback', () => {
t.mock.method(mockStorage, 'get')
t.mock.method(mockStorage, 'set')

const saturn = new Saturn({ storage: mockStorage, clientKey: CLIENT_KEY })
const saturn = new Saturn({ storage: mockStorage, clientKey: CLIENT_KEY, experimental })

// Mocking options
const mockOpts = { orchURL: TEST_DEFAULT_ORCH }
Expand Down Expand Up @@ -126,7 +129,7 @@ describe('Client Fallback', () => {
t.mock.method(mockStorage, 'get')
t.mock.method(mockStorage, 'set')

const saturn = new Saturn({ storage: mockStorage, clientKey: CLIENT_KEY, clientId: 'test' })
const saturn = new Saturn({ storage: mockStorage, clientKey: CLIENT_KEY, clientId: 'test', experimental })

const cid = saturn.fetchContentWithFallback('bafkreifjjcie6lypi6ny7amxnfftagclbuxndqonfipmb64f2km2devei4')

Expand Down Expand Up @@ -159,7 +162,7 @@ describe('Client Fallback', () => {
t.mock.method(mockStorage, 'get')
t.mock.method(mockStorage, 'set')

const saturn = new Saturn({ storage: mockStorage, clientKey: CLIENT_KEY, clientId: 'test' })
const saturn = new Saturn({ storage: mockStorage, clientKey: CLIENT_KEY, clientId: 'test', experimental })
// const origins =

const cid = saturn.fetchContentWithFallback('bafkreifjjcie6lypi6ny7amxnfftagclbuxndqonfipmb64f2km2devei4', { raceNodes: true })
Expand Down Expand Up @@ -193,7 +196,7 @@ describe('Client Fallback', () => {
t.mock.method(mockStorage, 'get')
t.mock.method(mockStorage, 'set')

const saturn = new Saturn({ storage: mockStorage, clientKey: CLIENT_KEY, clientId: 'test' })
const saturn = new Saturn({ storage: mockStorage, clientKey: CLIENT_KEY, clientId: 'test', experimental })

const cid = saturn.fetchContentWithFallback('bafkreifjjcie6lypi6ny7amxnfftagclbuxndqonfipmb64f2km2devei4', { raceNodes: true })

Expand All @@ -215,7 +218,7 @@ describe('Client Fallback', () => {

const server = getMockServer(handlers)
server.listen(MSW_SERVER_OPTS)
const saturn = new Saturn({ clientKey: CLIENT_KEY, clientId: 'test' })
const saturn = new Saturn({ clientKey: CLIENT_KEY, clientId: 'test', experimental })

const fetchContentMock = mock.fn(async function * (cidPath, opts) {
yield Buffer.from('chunk1')
Expand Down Expand Up @@ -243,7 +246,7 @@ describe('Client Fallback', () => {

const server = getMockServer(handlers)
server.listen(MSW_SERVER_OPTS)
const saturn = new Saturn({ clientKey: CLIENT_KEY, clientId: 'test' })
const saturn = new Saturn({ clientKey: CLIENT_KEY, clientId: 'test', experimental })

const fetchContentMock = mock.fn(async function * (cidPath, opts) { throw new Error('Fetch error') }) // eslint-disable-line
saturn.fetchContent = fetchContentMock
Expand All @@ -264,6 +267,70 @@ describe('Client Fallback', () => {
server.close()
})

test('Should abort fallback on 410s', async () => {
const numNodes = 3
const handlers = [
mockOrchHandler(numNodes, TEST_DEFAULT_ORCH, 'saturn.ms'),
mockJWT(TEST_AUTH),
...mockNodesHandlers(numNodes, TEST_ORIGIN_DOMAIN, 3, HTTP_STATUS_GONE)
]

const server = getMockServer(handlers)
server.listen(MSW_SERVER_OPTS)
const saturn = new Saturn({ clientKey: CLIENT_KEY, clientId: 'test', experimental })
await saturn.loadNodesPromise

let error
try {
for await (const _ of saturn.fetchContentWithFallback('bafkreifjjcie6lypi6ny7amxnfftagclbuxndqonfipmb64f2km2devei4')) { // eslint-disable-line
// This loop body shouldn't be reached.
}
} catch (e) {
error = e
}
const logs = saturn.logs

assert(error)
assert.strictEqual(logs.length, 1)
mock.reset()
server.close()
})

test('Should abort fallback on specific errors', async () => {
const numNodes = 3
const handlers = [
mockOrchHandler(numNodes, TEST_DEFAULT_ORCH, 'saturn.ms'),
mockJWT(TEST_AUTH),
...mockNodesHandlers(numNodes, TEST_ORIGIN_DOMAIN, 3, HTTP_STATUS_GONE)
]

const server = getMockServer(handlers)
server.listen(MSW_SERVER_OPTS)
const saturn = new Saturn({ clientKey: CLIENT_KEY, clientId: 'test', experimental })
await saturn.loadNodesPromise

let callCount = 0
const fetchContentMock = mock.fn(async function * (cidPath, opts) {
callCount++
yield ''
throw new Error('file does not exist')
})

saturn.fetchContent = fetchContentMock

let error
try {
for await (const _ of saturn.fetchContentWithFallback('bafkreifjjcie6lypi6ny7amxnfftagclbuxndqonfipmb64f2km2devei4')) { // eslint-disable-line
}
} catch (e) {
error = e
}

assert(error)
assert.strictEqual(callCount, 1)
mock.reset()
server.close()
})
test('Handles fallback with chunk overlap correctly', async () => {
const numNodes = 3
const handlers = [
Expand All @@ -274,7 +341,7 @@ describe('Client Fallback', () => {

const server = getMockServer(handlers)
server.listen(MSW_SERVER_OPTS)
const saturn = new Saturn({ clientKey: CLIENT_KEY, clientId: 'test' })
const saturn = new Saturn({ clientKey: CLIENT_KEY, clientId: 'test', experimental })

let callCount = 0
const fetchContentMock = mock.fn(async function * (cidPath, opts) {
Expand Down Expand Up @@ -313,7 +380,7 @@ describe('Client Fallback', () => {

const server = getMockServer(handlers)
server.listen(MSW_SERVER_OPTS)
const saturn = new Saturn({ clientKey: CLIENT_KEY, clientId: 'test' })
const saturn = new Saturn({ clientKey: CLIENT_KEY, clientId: 'test', experimental })

let callCount = 0
let fetchContentMock = mock.fn(async function * (cidPath, opts) {
Expand Down
11 changes: 6 additions & 5 deletions test/test-utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ import { fileURLToPath } from 'node:url'
import fs from 'fs'
import { addHttpPrefix } from '../src/utils/url.js'

const HTTP_STATUS_OK = 200
const HTTP_STATUS_TIMEOUT = 504
export const HTTP_STATUS_OK = 200
export const HTTP_STATUS_TIMEOUT = 504
export const HTTP_STATUS_GONE = 410

const __dirname = dirname(fileURLToPath(import.meta.url))
process.env.TESTING = 'true'
Expand Down Expand Up @@ -120,20 +121,20 @@ export function mockJWT (authURL) {
* @param {number} count - amount of nodes to mock
* @param {string} originDomain - saturn origin domain.
* @param {number} failures
* @param {number} failureCode
* @returns {RestHandler<any>[]}
*/
export function mockNodesHandlers (count, originDomain, failures = 0) {
export function mockNodesHandlers (count, originDomain, failures = 0, failureCode = HTTP_STATUS_TIMEOUT) {
if (failures > count) {
throw Error('failures number cannot exceed node count')
}
const nodes = generateNodes(count, originDomain)

const handlers = nodes.map((node, idx) => {
const url = `${node.url}/ipfs/:cid`
return rest.get(url, (req, res, ctx) => {
if (idx < failures) {
return res(
ctx.status(HTTP_STATUS_TIMEOUT)
ctx.status(failureCode)
)
}
const filepath = getFixturePath('hello.car')
Expand Down

0 comments on commit 8b532ec

Please sign in to comment.