From 59eabb08383d3a3027b6e6be1130ab838c48d060 Mon Sep 17 00:00:00 2001 From: Tyson Date: Mon, 14 Apr 2025 12:29:44 +1000 Subject: [PATCH] feat(types): Support custom events on PubSub and PeerStreams --- packages/interface/src/pubsub.ts | 2 +- packages/pubsub/src/index.ts | 18 +++++++++--------- packages/pubsub/src/peer-streams.ts | 2 +- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/packages/interface/src/pubsub.ts b/packages/interface/src/pubsub.ts index 97180448e0..8c81c471d0 100644 --- a/packages/interface/src/pubsub.ts +++ b/packages/interface/src/pubsub.ts @@ -66,7 +66,7 @@ export interface PubSubRPC { messages: PubSubRPCMessage[] } -export interface PeerStreams extends TypedEventTarget { +export interface PeerStreams extends TypedEventTarget { id: PeerId protocol: string outboundStream?: Pushable diff --git a/packages/pubsub/src/index.ts b/packages/pubsub/src/index.ts index c130640462..df8a774996 100644 --- a/packages/pubsub/src/index.ts +++ b/packages/pubsub/src/index.ts @@ -40,7 +40,7 @@ import { verifySignature } from './sign.js' import { toMessage, ensureArray, noSignMsgId, msgId, toRpcMessage, randomSeqno } from './utils.js' -import type { PubSub, Message, StrictNoSign, StrictSign, PubSubInit, PubSubEvents, PeerStreams, PubSubRPCMessage, PubSubRPC, PubSubRPCSubscription, SubscriptionChangeData, PublishResult, TopicValidatorFn, ComponentLogger, Logger, Connection, PeerId, PrivateKey, IncomingStreamData } from '@libp2p/interface' +import type { PubSub, Message, StrictNoSign, StrictSign, PubSubInit, PubSubEvents, PeerStreams, PubSubRPCMessage, PubSubRPC, PubSubRPCSubscription, SubscriptionChangeData, PublishResult, TopicValidatorFn, ComponentLogger, Logger, Connection, PeerId, PrivateKey, IncomingStreamData, PeerStreamEvents } from '@libp2p/interface' import type { Registrar } from '@libp2p/interface-internal' import type { Uint8ArrayList } from 'uint8arraylist' @@ -55,7 +55,7 @@ export interface PubSubComponents { * PubSubBaseProtocol handles the peers and connections logic for pubsub routers * and specifies the API that pubsub routers should have. */ -export abstract class PubSubBaseProtocol = PubSubEvents> extends TypedEventEmitter implements PubSub { +export abstract class PubSubBaseProtocol = PubSubEvents, PeerEvents extends PeerStreamEvents = PeerStreamEvents> extends TypedEventEmitter implements PubSub { protected log: Logger public started: boolean @@ -70,7 +70,7 @@ export abstract class PubSubBaseProtocol = Pu /** * Map of peer streams */ - public peers: PeerMap + public peers: PeerMap> /** * The signature policy to follow by default */ @@ -119,7 +119,7 @@ export abstract class PubSubBaseProtocol = Pu this.started = false this.topics = new Map() this.subscriptions = new Set() - this.peers = new PeerMap() + this.peers = new PeerMap>() this.globalSignaturePolicy = globalSignaturePolicy === 'StrictNoSign' ? 'StrictNoSign' : 'StrictSign' this.canRelayMessage = canRelayMessage this.emitSelf = emitSelf @@ -268,7 +268,7 @@ export abstract class PubSubBaseProtocol = Pu /** * Notifies the router that a peer has been connected */ - addPeer (peerId: PeerId, protocol: string): PeerStreams { + addPeer (peerId: PeerId, protocol: string): PeerStreams { const existing = this.peers.get(peerId) // If peer streams already exists, do nothing @@ -279,7 +279,7 @@ export abstract class PubSubBaseProtocol = Pu // else create a new peer streams this.log('new peer %p', peerId) - const peerStreams: PeerStreams = new PeerStreamsImpl(this.components, { + const peerStreams: PeerStreams = new PeerStreamsImpl(this.components, { id: peerId, protocol }) @@ -295,7 +295,7 @@ export abstract class PubSubBaseProtocol = Pu /** * Notifies the router that a peer has been disconnected */ - protected _removePeer (peerId: PeerId): PeerStreams | undefined { + protected _removePeer (peerId: PeerId): PeerStreams | undefined { const peerStreams = this.peers.get(peerId) if (peerStreams == null) { return @@ -321,7 +321,7 @@ export abstract class PubSubBaseProtocol = Pu /** * Responsible for processing each RPC message received by other peers. */ - async processMessages (peerId: PeerId, stream: AsyncIterable, peerStreams: PeerStreams): Promise { + async processMessages (peerId: PeerId, stream: AsyncIterable, peerStreams: PeerStreams): Promise { try { await pipe( stream, @@ -369,7 +369,7 @@ export abstract class PubSubBaseProtocol = Pu /** * Handles an rpc request from a peer */ - async processRpc (from: PeerId, peerStreams: PeerStreams, rpc: PubSubRPC): Promise { + async processRpc (from: PeerId, peerStreams: PeerStreams, rpc: PubSubRPC): Promise { if (!this.acceptFrom(from)) { this.log('received message from unacceptable peer %p', from) return false diff --git a/packages/pubsub/src/peer-streams.ts b/packages/pubsub/src/peer-streams.ts index cd5dbda39c..f2ebb2a554 100644 --- a/packages/pubsub/src/peer-streams.ts +++ b/packages/pubsub/src/peer-streams.ts @@ -24,7 +24,7 @@ export interface DecoderOptions extends LpDecoderOptions { /** * Thin wrapper around a peer's inbound / outbound pubsub streams */ -export class PeerStreams extends TypedEventEmitter { +export class PeerStreams extends TypedEventEmitter { public readonly id: PeerId public readonly protocol: string /**