aboutsummaryrefslogtreecommitdiff
path: root/extension/src/webext-bridge/internal
diff options
context:
space:
mode:
Diffstat (limited to 'extension/src/webext-bridge/internal')
-rw-r--r--extension/src/webext-bridge/internal/connection-args.ts31
-rw-r--r--extension/src/webext-bridge/internal/delivery-logger.ts28
-rw-r--r--extension/src/webext-bridge/internal/endpoint-fingerprint.ts5
-rw-r--r--extension/src/webext-bridge/internal/endpoint-runtime.ts187
-rw-r--r--extension/src/webext-bridge/internal/endpoint.ts20
-rw-r--r--extension/src/webext-bridge/internal/is-internal-endpoint.ts5
-rw-r--r--extension/src/webext-bridge/internal/message-port.ts52
-rw-r--r--extension/src/webext-bridge/internal/persistent-port.ts126
-rw-r--r--extension/src/webext-bridge/internal/port-message.ts48
-rw-r--r--extension/src/webext-bridge/internal/post-message.ts52
-rw-r--r--extension/src/webext-bridge/internal/stream.ts179
-rw-r--r--extension/src/webext-bridge/internal/types.ts6
12 files changed, 739 insertions, 0 deletions
diff --git a/extension/src/webext-bridge/internal/connection-args.ts b/extension/src/webext-bridge/internal/connection-args.ts
new file mode 100644
index 0000000..9b93e19
--- /dev/null
+++ b/extension/src/webext-bridge/internal/connection-args.ts
@@ -0,0 +1,31 @@
+import type { EndpointFingerprint } from './endpoint-fingerprint'
+
+export interface ConnectionArgs {
+ endpointName: string
+ fingerprint: EndpointFingerprint
+}
+
+const isValidConnectionArgs = (
+ args: unknown,
+ requiredKeys: (keyof ConnectionArgs)[] = ['endpointName', 'fingerprint'],
+): args is ConnectionArgs =>
+ typeof args === 'object'
+ && args !== null
+ && requiredKeys.every(k => k in args)
+
+export const encodeConnectionArgs = (args: ConnectionArgs) => {
+ if (!isValidConnectionArgs(args))
+ throw new TypeError('Invalid connection args')
+
+ return JSON.stringify(args)
+}
+
+export const decodeConnectionArgs = (encodedArgs: string): ConnectionArgs => {
+ try {
+ const args = JSON.parse(encodedArgs)
+ return isValidConnectionArgs(args) ? args : null
+ }
+ catch (error) {
+ return null
+ }
+}
diff --git a/extension/src/webext-bridge/internal/delivery-logger.ts b/extension/src/webext-bridge/internal/delivery-logger.ts
new file mode 100644
index 0000000..395f035
--- /dev/null
+++ b/extension/src/webext-bridge/internal/delivery-logger.ts
@@ -0,0 +1,28 @@
+import type { InternalMessage } from '../types'
+import type { EndpointFingerprint } from './endpoint-fingerprint'
+
+export interface DeliveryReceipt {
+ message: InternalMessage
+ to: EndpointFingerprint
+ from: {
+ endpointId: string
+ fingerprint: EndpointFingerprint
+ }
+}
+
+export const createDeliveryLogger = () => {
+ let logs: ReadonlyArray<DeliveryReceipt> = []
+
+ return {
+ add: (...receipts: DeliveryReceipt[]) => {
+ logs = [...logs, ...receipts]
+ },
+ remove: (message: string | DeliveryReceipt[]) => {
+ logs
+ = typeof message === 'string'
+ ? logs.filter(receipt => receipt.message.transactionId !== message)
+ : logs.filter(receipt => !message.includes(receipt))
+ },
+ entries: () => logs,
+ }
+}
diff --git a/extension/src/webext-bridge/internal/endpoint-fingerprint.ts b/extension/src/webext-bridge/internal/endpoint-fingerprint.ts
new file mode 100644
index 0000000..fe3cc24
--- /dev/null
+++ b/extension/src/webext-bridge/internal/endpoint-fingerprint.ts
@@ -0,0 +1,5 @@
+import uid from 'tiny-uid'
+
+export type EndpointFingerprint = `uid::${string}`
+
+export const createFingerprint = (): EndpointFingerprint => `uid::${uid(7)}`
diff --git a/extension/src/webext-bridge/internal/endpoint-runtime.ts b/extension/src/webext-bridge/internal/endpoint-runtime.ts
new file mode 100644
index 0000000..67b4fe0
--- /dev/null
+++ b/extension/src/webext-bridge/internal/endpoint-runtime.ts
@@ -0,0 +1,187 @@
+import type { JsonValue } from 'type-fest'
+import uuid from 'tiny-uid'
+import { serializeError } from 'serialize-error'
+import type {
+ BridgeMessage,
+ DataTypeKey,
+ Destination,
+ GetDataType,
+ GetReturnType,
+ InternalMessage,
+ OnMessageCallback,
+ RuntimeContext,
+} from '../types'
+import { parseEndpoint } from './endpoint'
+
+export interface EndpointRuntime {
+ sendMessage: <
+ ReturnType extends JsonValue,
+ K extends DataTypeKey = DataTypeKey,
+ >(
+ messageID: K,
+ data: GetDataType<K, JsonValue>,
+ destination?: Destination
+ ) => Promise<GetReturnType<K, ReturnType>>
+ onMessage: <Data extends JsonValue, K extends DataTypeKey = DataTypeKey>(
+ messageID: K,
+ callback: OnMessageCallback<GetDataType<K, Data>, GetReturnType<K, any>>
+ ) => (() => void)
+ /**
+ * @internal
+ */
+ handleMessage: (message: InternalMessage) => void
+ endTransaction: (transactionID: string) => void
+}
+
+export const createEndpointRuntime = (
+ thisContext: RuntimeContext,
+ routeMessage: (msg: InternalMessage) => void,
+ localMessage?: (msg: InternalMessage) => void,
+): EndpointRuntime => {
+ const runtimeId = uuid()
+ const openTransactions = new Map<
+ string,
+ { resolve: (v: unknown) => void; reject: (e: unknown) => void }
+ >()
+ const onMessageListeners = new Map<string, OnMessageCallback<JsonValue>>()
+
+ const handleMessage = (message: InternalMessage) => {
+ if (
+ message.destination.context === thisContext
+ && !message.destination.frameId
+ && !message.destination.tabId
+ ) {
+ localMessage?.(message)
+
+ const { transactionId, messageID, messageType } = message
+
+ const handleReply = () => {
+ const transactionP = openTransactions.get(transactionId)
+ if (transactionP) {
+ const { err, data } = message
+ if (err) {
+ const dehydratedErr = err as Record<string, string>
+ const errCtr = self[dehydratedErr.name] as any
+ const hydratedErr = new (
+ typeof errCtr === 'function' ? errCtr : Error
+ )(dehydratedErr.message)
+
+ // eslint-disable-next-line no-restricted-syntax
+ for (const prop in dehydratedErr)
+ hydratedErr[prop] = dehydratedErr[prop]
+
+ transactionP.reject(hydratedErr)
+ }
+ else {
+ transactionP.resolve(data)
+ }
+ openTransactions.delete(transactionId)
+ }
+ }
+
+ const handleNewMessage = async() => {
+ let reply: JsonValue | void
+ let err: Error
+ let noHandlerFoundError = false
+
+ try {
+ const cb = onMessageListeners.get(messageID)
+ if (typeof cb === 'function') {
+ // eslint-disable-next-line n/no-callback-literal
+ reply = await cb({
+ sender: message.origin,
+ id: messageID,
+ data: message.data,
+ timestamp: message.timestamp,
+ } as BridgeMessage<JsonValue>)
+ }
+ else {
+ noHandlerFoundError = true
+ throw new Error(
+ `[webext-bridge] No handler registered in '${thisContext}' to accept messages with id '${messageID}'`,
+ )
+ }
+ }
+ catch (error) {
+ err = error
+ }
+ finally {
+ if (err) message.err = serializeError(err)
+
+ handleMessage({
+ ...message,
+ messageType: 'reply',
+ data: reply,
+ origin: { context: thisContext, tabId: null },
+ destination: message.origin,
+ hops: [],
+ })
+
+ if (err && !noHandlerFoundError)
+ // eslint-disable-next-line no-unsafe-finally
+ throw reply
+ }
+ }
+
+ switch (messageType) {
+ case 'reply':
+ return handleReply()
+ case 'message':
+ return handleNewMessage()
+ }
+ }
+
+ message.hops.push(`${thisContext}::${runtimeId}`)
+
+ return routeMessage(message)
+ }
+
+ return {
+ handleMessage,
+ endTransaction: (transactionID) => {
+ const transactionP = openTransactions.get(transactionID)
+ transactionP?.reject('Transaction was ended before it could complete')
+ openTransactions.delete(transactionID)
+ },
+ sendMessage: (messageID, data, destination = 'background') => {
+ const endpoint
+ = typeof destination === 'string'
+ ? parseEndpoint(destination)
+ : destination
+ const errFn = 'Bridge#sendMessage ->'
+
+ if (!endpoint.context) {
+ throw new TypeError(
+ `${errFn} Destination must be any one of known destinations`,
+ )
+ }
+
+ return new Promise((resolve, reject) => {
+ const payload: InternalMessage = {
+ messageID,
+ data,
+ destination: endpoint,
+ messageType: 'message',
+ transactionId: uuid(),
+ origin: { context: thisContext, tabId: null },
+ hops: [],
+ timestamp: Date.now(),
+ }
+
+ openTransactions.set(payload.transactionId, { resolve, reject })
+
+ try {
+ handleMessage(payload)
+ }
+ catch (error) {
+ openTransactions.delete(payload.transactionId)
+ reject(error)
+ }
+ })
+ },
+ onMessage: (messageID, callback) => {
+ onMessageListeners.set(messageID, callback)
+ return () => onMessageListeners.delete(messageID)
+ },
+ }
+}
diff --git a/extension/src/webext-bridge/internal/endpoint.ts b/extension/src/webext-bridge/internal/endpoint.ts
new file mode 100644
index 0000000..0c271f2
--- /dev/null
+++ b/extension/src/webext-bridge/internal/endpoint.ts
@@ -0,0 +1,20 @@
+import type { Endpoint, RuntimeContext } from '../types'
+
+const ENDPOINT_RE = /^((?:background$)|devtools|popup|options|content-script|window)(?:@(\d+)(?:\.(\d+))?)?$/
+
+export const parseEndpoint = (endpoint: string): Endpoint => {
+ const [, context, tabId, frameId] = endpoint.match(ENDPOINT_RE) || []
+
+ return {
+ context: context as RuntimeContext,
+ tabId: +tabId,
+ frameId: frameId ? +frameId : undefined,
+ }
+}
+
+export const formatEndpoint = ({ context, tabId, frameId }: Endpoint): string => {
+ if (['background', 'popup', 'options'].includes(context))
+ return context
+
+ return `${context}@${tabId}${frameId ? `.${frameId}` : ''}`
+}
diff --git a/extension/src/webext-bridge/internal/is-internal-endpoint.ts b/extension/src/webext-bridge/internal/is-internal-endpoint.ts
new file mode 100644
index 0000000..5e6ab4c
--- /dev/null
+++ b/extension/src/webext-bridge/internal/is-internal-endpoint.ts
@@ -0,0 +1,5 @@
+import type { Endpoint, RuntimeContext } from '../types'
+
+const internalEndpoints: RuntimeContext[] = ['background', 'devtools', 'content-script', 'options', 'popup']
+
+export const isInternalEndpoint = ({ context: ctx }: Endpoint): boolean => internalEndpoints.includes(ctx)
diff --git a/extension/src/webext-bridge/internal/message-port.ts b/extension/src/webext-bridge/internal/message-port.ts
new file mode 100644
index 0000000..204c11a
--- /dev/null
+++ b/extension/src/webext-bridge/internal/message-port.ts
@@ -0,0 +1,52 @@
+let promise: Promise<MessagePort>
+
+/**
+ * Returns a MessagePort for one-on-one communication
+ *
+ * Depending on which context's code runs first, either an incoming port from the other side
+ * is accepted OR a port will be offered, which the other side will then accept.
+ */
+export const getMessagePort = (
+ thisContext: 'window' | 'content-script',
+ namespace: string,
+ onMessage: (e: MessageEvent<any>) => void,
+): Promise<MessagePort> => (
+ promise ??= new Promise((resolve) => {
+ const acceptMessagingPort = (event: MessageEvent) => {
+ const { data: { cmd, scope, context }, ports } = event
+ if (cmd === 'webext-port-offer' && scope === namespace && context !== thisContext) {
+ window.removeEventListener('message', acceptMessagingPort)
+ ports[0].onmessage = onMessage
+ ports[0].postMessage('port-accepted')
+ return resolve(ports[0])
+ }
+ }
+
+ const offerMessagingPort = () => {
+ const channel = new MessageChannel()
+ channel.port1.onmessage = (event: MessageEvent) => {
+ if (event.data === 'port-accepted') {
+ window.removeEventListener('message', acceptMessagingPort)
+ return resolve(channel.port1)
+ }
+
+ onMessage?.(event)
+ }
+
+ window.postMessage({
+ cmd: 'webext-port-offer',
+ scope: namespace,
+ context: thisContext,
+ }, '*', [channel.port2])
+ }
+
+ window.addEventListener('message', acceptMessagingPort)
+
+ // one of the contexts needs to be offset by at least 1 tick to prevent a race condition
+ // where both of them are offering, and then also accepting the port at the same time
+ if (thisContext === 'window')
+ setTimeout(offerMessagingPort, 0)
+ else
+ offerMessagingPort()
+ })
+)
diff --git a/extension/src/webext-bridge/internal/persistent-port.ts b/extension/src/webext-bridge/internal/persistent-port.ts
new file mode 100644
index 0000000..2281c68
--- /dev/null
+++ b/extension/src/webext-bridge/internal/persistent-port.ts
@@ -0,0 +1,126 @@
+import browser from 'webextension-polyfill'
+import type { Runtime } from 'webextension-polyfill'
+import type { InternalMessage } from '../types'
+import { createFingerprint } from './endpoint-fingerprint'
+import type { QueuedMessage } from './types'
+import { encodeConnectionArgs } from './connection-args'
+import { createDeliveryLogger } from './delivery-logger'
+import type { StatusMessage } from './port-message'
+import { PortMessage } from './port-message'
+
+/**
+ * Manfiest V3 extensions can have their service worker terminated at any point
+ * by the browser. That termination of service worker also terminates any messaging
+ * porta created by other parts of the extension. This class is a wrapper around the
+ * built-in Port object that re-instantiates the port connection everytime it gets
+ * suspended
+ */
+export const createPersistentPort = (name = '') => {
+ const fingerprint = createFingerprint()
+ let port: Runtime.Port
+ let undeliveredQueue: ReadonlyArray<QueuedMessage> = []
+ const pendingResponses = createDeliveryLogger()
+ const onMessageListeners = new Set<
+ (message: InternalMessage, port: Runtime.Port) => void
+ >()
+ const onFailureListeners = new Set<(message: InternalMessage) => void>()
+
+ const handleMessage = (msg: StatusMessage, port: Runtime.Port) => {
+ switch (msg.status) {
+ case 'undeliverable':
+ if (
+ !undeliveredQueue.some(
+ m => m.message.messageID === msg.message.messageID,
+ )
+ ) {
+ undeliveredQueue = [
+ ...undeliveredQueue,
+ {
+ message: msg.message,
+ resolvedDestination: msg.resolvedDestination,
+ },
+ ]
+ }
+
+ return
+
+ case 'deliverable':
+ undeliveredQueue = undeliveredQueue.reduce((acc, queuedMsg) => {
+ if (queuedMsg.resolvedDestination === msg.deliverableTo) {
+ PortMessage.toBackground(port, {
+ type: 'deliver',
+ message: queuedMsg.message,
+ })
+
+ return acc
+ }
+
+ return [...acc, queuedMsg]
+ }, [] as ReadonlyArray<QueuedMessage>)
+
+ return
+
+ case 'delivered':
+ if (msg.receipt.message.messageType === 'message')
+ pendingResponses.add(msg.receipt)
+
+ return
+
+ case 'incoming':
+ if (msg.message.messageType === 'reply')
+ pendingResponses.remove(msg.message.messageID)
+
+ onMessageListeners.forEach(cb => cb(msg.message, port))
+
+ return
+
+ case 'terminated': {
+ const rogueMsgs = pendingResponses
+ .entries()
+ .filter(receipt => msg.fingerprint === receipt.to)
+ pendingResponses.remove(rogueMsgs)
+ rogueMsgs.forEach(({ message }) =>
+ onFailureListeners.forEach(cb => cb(message)),
+ )
+ }
+ }
+ }
+
+ const connect = () => {
+ port = browser.runtime.connect({
+ name: encodeConnectionArgs({
+ endpointName: name,
+ fingerprint,
+ }),
+ })
+ port.onMessage.addListener(handleMessage)
+ port.onDisconnect.addListener(connect)
+
+ PortMessage.toBackground(port, {
+ type: 'sync',
+ pendingResponses: pendingResponses.entries(),
+ pendingDeliveries: [
+ ...new Set(
+ undeliveredQueue.map(({ resolvedDestination }) => resolvedDestination),
+ ),
+ ],
+ })
+ }
+
+ connect()
+
+ return {
+ onFailure(cb: (message: InternalMessage) => void) {
+ onFailureListeners.add(cb)
+ },
+ onMessage(cb: (message: InternalMessage) => void): void {
+ onMessageListeners.add(cb)
+ },
+ postMessage(message: any): void {
+ PortMessage.toBackground(port, {
+ type: 'deliver',
+ message,
+ })
+ },
+ }
+}
diff --git a/extension/src/webext-bridge/internal/port-message.ts b/extension/src/webext-bridge/internal/port-message.ts
new file mode 100644
index 0000000..056e219
--- /dev/null
+++ b/extension/src/webext-bridge/internal/port-message.ts
@@ -0,0 +1,48 @@
+import type { Runtime } from 'webextension-polyfill'
+import type { InternalMessage } from '../types'
+import type { DeliveryReceipt } from './delivery-logger'
+import type { EndpointFingerprint } from './endpoint-fingerprint'
+
+export type StatusMessage =
+ | {
+ status: 'undeliverable'
+ message: InternalMessage
+ resolvedDestination: string
+ }
+ | {
+ status: 'deliverable'
+ deliverableTo: string
+ }
+ | {
+ status: 'delivered'
+ receipt: DeliveryReceipt
+ }
+ | {
+ status: 'incoming'
+ message: InternalMessage
+ }
+ | {
+ status: 'terminated'
+ fingerprint: EndpointFingerprint
+ }
+
+export type RequestMessage =
+ | {
+ type: 'sync'
+ pendingResponses: ReadonlyArray<DeliveryReceipt>
+ pendingDeliveries: ReadonlyArray<string>
+ }
+ | {
+ type: 'deliver'
+ message: InternalMessage
+ }
+
+export class PortMessage {
+ static toBackground(port: Runtime.Port, message: RequestMessage) {
+ return port.postMessage(message)
+ }
+
+ static toExtensionContext(port: Runtime.Port, message: StatusMessage) {
+ return port.postMessage(message)
+ }
+}
diff --git a/extension/src/webext-bridge/internal/post-message.ts b/extension/src/webext-bridge/internal/post-message.ts
new file mode 100644
index 0000000..9db4424
--- /dev/null
+++ b/extension/src/webext-bridge/internal/post-message.ts
@@ -0,0 +1,52 @@
+import type { InternalMessage } from '../types'
+import { getMessagePort } from './message-port'
+
+export interface EndpointWontRespondError {
+ type: 'error'
+ transactionID: string
+}
+
+export const usePostMessaging = (thisContext: 'window' | 'content-script') => {
+ let allocatedNamespace: string
+ let messagingEnabled = false
+ let onMessageCallback: (
+ msg: InternalMessage | EndpointWontRespondError
+ ) => void
+ let portP: Promise<MessagePort>
+
+ return {
+ enable: () => (messagingEnabled = true),
+ onMessage: (cb: typeof onMessageCallback) => (onMessageCallback = cb),
+ postMessage: async(msg: InternalMessage | EndpointWontRespondError) => {
+ if (thisContext !== 'content-script' && thisContext !== 'window')
+ throw new Error('Endpoint does not use postMessage')
+
+ if (!messagingEnabled)
+ throw new Error('Communication with window has not been allowed')
+
+ ensureNamespaceSet(allocatedNamespace)
+
+ return (await portP).postMessage(msg)
+ },
+ setNamespace: (nsps: string) => {
+ if (allocatedNamespace)
+ throw new Error('Namespace once set cannot be changed')
+
+ allocatedNamespace = nsps
+ portP = getMessagePort(thisContext, nsps, ({ data }) =>
+ onMessageCallback?.(data),
+ )
+ },
+ }
+}
+
+function ensureNamespaceSet(namespace: string) {
+ if (typeof namespace !== 'string' || namespace.trim().length === 0) {
+ throw new Error(
+ 'webext-bridge uses window.postMessage to talk with other "window"(s) for message routing'
+ + 'which is global/conflicting operation in case there are other scripts using webext-bridge. '
+ + 'Call Bridge#setNamespace(nsps) to isolate your app. Example: setNamespace(\'com.facebook.react-devtools\'). '
+ + 'Make sure to use same namespace across all your scripts whereever window.postMessage is likely to be used`',
+ )
+ }
+}
diff --git a/extension/src/webext-bridge/internal/stream.ts b/extension/src/webext-bridge/internal/stream.ts
new file mode 100644
index 0000000..54cee9d
--- /dev/null
+++ b/extension/src/webext-bridge/internal/stream.ts
@@ -0,0 +1,179 @@
+import { createNanoEvents } from 'nanoevents'
+import uuid from 'tiny-uid'
+import type { Emitter } from 'nanoevents'
+import type { JsonValue } from 'type-fest'
+import type { Endpoint, HybridUnsubscriber, RuntimeContext, StreamInfo } from '../types'
+import type { EndpointRuntime } from './endpoint-runtime'
+import { parseEndpoint } from './endpoint'
+
+/**
+ * Built on top of Bridge. Nothing much special except that Stream allows
+ * you to create a namespaced scope under a channel name of your choice
+ * and allows continuous e2e communication, with less possibility of
+ * conflicting messageId's, since streams are strictly scoped.
+ */
+export class Stream {
+ private static initDone = false
+ private static openStreams: Map<string, Stream> = new Map()
+
+ private emitter: Emitter = createNanoEvents()
+ private isClosed = false
+ constructor(private endpointRuntime: EndpointRuntime, private streamInfo: StreamInfo) {
+ if (!Stream.initDone) {
+ endpointRuntime.onMessage<{ streamId: string; action: 'transfer' | 'close'; streamTransfer: JsonValue }, string>('__crx_bridge_stream_transfer__', (msg) => {
+ const { streamId, streamTransfer, action } = msg.data
+ const stream = Stream.openStreams.get(streamId)
+ if (stream && !stream.isClosed) {
+ if (action === 'transfer')
+ stream.emitter.emit('message', streamTransfer)
+
+ if (action === 'close') {
+ Stream.openStreams.delete(streamId)
+ stream.handleStreamClose()
+ }
+ }
+ })
+ Stream.initDone = true
+ }
+
+ Stream.openStreams.set(this.streamInfo.streamId, this)
+ }
+
+ /**
+ * Returns stream info
+ */
+ public get info(): StreamInfo {
+ return this.streamInfo
+ }
+
+ /**
+ * Sends a message to other endpoint.
+ * Will trigger onMessage on the other side.
+ *
+ * Warning: Before sending sensitive data, verify the endpoint using `stream.info.endpoint.isInternal()`
+ * The other side could be malicious webpage speaking same language as webext-bridge
+ * @param msg
+ */
+ public send(msg?: JsonValue): void {
+ if (this.isClosed)
+ throw new Error('Attempting to send a message over closed stream. Use stream.onClose(<callback>) to keep an eye on stream status')
+
+ this.endpointRuntime.sendMessage('__crx_bridge_stream_transfer__', {
+ streamId: this.streamInfo.streamId,
+ streamTransfer: msg,
+ action: 'transfer',
+ }, this.streamInfo.endpoint)
+ }
+
+ /**
+ * Closes the stream.
+ * Will trigger stream.onClose(<callback>) on both endpoints.
+ * If needed again, spawn a new Stream, as this instance cannot be re-opened
+ * @param msg
+ */
+ public close(msg?: JsonValue): void {
+ if (msg)
+ this.send(msg)
+
+ this.handleStreamClose()
+
+ this.endpointRuntime.sendMessage('__crx_bridge_stream_transfer__', {
+ streamId: this.streamInfo.streamId,
+ streamTransfer: null,
+ action: 'close',
+ }, this.streamInfo.endpoint)
+ }
+
+ /**
+ * Registers a callback to fire whenever other endpoint sends a message
+ * @param callback
+ */
+ public onMessage<T extends JsonValue>(callback: (msg?: T) => void): HybridUnsubscriber {
+ return this.getDisposable('message', callback)
+ }
+
+ /**
+ * Registers a callback to fire whenever stream.close() is called on either endpoint
+ * @param callback
+ */
+ public onClose<T extends JsonValue>(callback: (msg?: T) => void): HybridUnsubscriber {
+ return this.getDisposable('closed', callback)
+ }
+
+ private handleStreamClose = () => {
+ if (!this.isClosed) {
+ this.isClosed = true
+ this.emitter.emit('closed', true)
+ this.emitter.events = {}
+ }
+ }
+
+ private getDisposable(event: string, callback: () => void): HybridUnsubscriber {
+ const off = this.emitter.on(event, callback)
+
+ return Object.assign(off, {
+ dispose: off,
+ close: off,
+ })
+ }
+}
+
+export const createStreamWirings = (endpointRuntime: EndpointRuntime) => {
+ const openStreams = new Map<string, Stream>()
+ const onOpenStreamCallbacks = new Map<string, (stream: Stream) => void>()
+ const streamyEmitter = createNanoEvents()
+
+ endpointRuntime.onMessage<{ channel: string; streamId: string }, string>('__crx_bridge_stream_open__', (message) => {
+ return new Promise((resolve) => {
+ const { sender, data } = message
+ const { channel } = data
+ let watching = false
+ let off = () => { }
+
+ const readyup = () => {
+ const callback = onOpenStreamCallbacks.get(channel)
+
+ if (typeof callback === 'function') {
+ callback(new Stream(endpointRuntime, { ...data, endpoint: sender }))
+ if (watching)
+ off()
+
+ resolve(true)
+ }
+ else if (!watching) {
+ watching = true
+ off = streamyEmitter.on('did-change-stream-callbacks', readyup)
+ }
+ }
+
+ readyup()
+ })
+ })
+
+ async function openStream(channel: string, destination: RuntimeContext | Endpoint | string): Promise<Stream> {
+ if (openStreams.has(channel))
+ throw new Error('webext-bridge: A Stream is already open at this channel')
+
+ const endpoint = typeof destination === 'string' ? parseEndpoint(destination) : destination
+
+ const streamInfo: StreamInfo = { streamId: uuid(), channel, endpoint }
+ const stream = new Stream(endpointRuntime, streamInfo)
+ stream.onClose(() => openStreams.delete(channel))
+ await endpointRuntime.sendMessage('__crx_bridge_stream_open__', streamInfo as unknown as JsonValue, endpoint)
+ openStreams.set(channel, stream)
+ return stream
+ }
+
+ function onOpenStreamChannel(channel: string, callback: (stream: Stream) => void): void {
+ if (onOpenStreamCallbacks.has(channel))
+ throw new Error('webext-bridge: This channel has already been claimed. Stream allows only one-on-one communication')
+
+ onOpenStreamCallbacks.set(channel, callback)
+ streamyEmitter.emit('did-change-stream-callbacks')
+ }
+
+ return {
+ openStream,
+ onOpenStreamChannel,
+ }
+}
diff --git a/extension/src/webext-bridge/internal/types.ts b/extension/src/webext-bridge/internal/types.ts
new file mode 100644
index 0000000..2063adb
--- /dev/null
+++ b/extension/src/webext-bridge/internal/types.ts
@@ -0,0 +1,6 @@
+import type { InternalMessage } from '../types'
+
+export interface QueuedMessage {
+ resolvedDestination: string
+ message: InternalMessage
+}