Skip to content

feat(types): Support custom events on PubSub and PeerStreams #3078

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/interface/src/pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ export interface PubSubRPC {
messages: PubSubRPCMessage[]
}

export interface PeerStreams extends TypedEventTarget<PeerStreamEvents> {
export interface PeerStreams<PeerEvents extends PeerStreamEvents = PeerStreamEvents> extends TypedEventTarget<PeerEvents> {
id: PeerId
protocol: string
outboundStream?: Pushable<Uint8ArrayList>
Expand Down
18 changes: 9 additions & 9 deletions packages/pubsub/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import {
verifySignature
} from './sign.js'
import { toMessage, ensureArray, noSignMsgId, msgId, toRpcMessage, randomSeqno } from './utils.js'
import type { PubSub, Message, StrictNoSign, StrictSign, PubSubInit, PubSubEvents, PeerStreams, PubSubRPCMessage, PubSubRPC, PubSubRPCSubscription, SubscriptionChangeData, PublishResult, TopicValidatorFn, ComponentLogger, Logger, Connection, PeerId, PrivateKey, IncomingStreamData } from '@libp2p/interface'
import type { PubSub, Message, StrictNoSign, StrictSign, PubSubInit, PubSubEvents, PeerStreams, PubSubRPCMessage, PubSubRPC, PubSubRPCSubscription, SubscriptionChangeData, PublishResult, TopicValidatorFn, ComponentLogger, Logger, Connection, PeerId, PrivateKey, IncomingStreamData, PeerStreamEvents } from '@libp2p/interface'
import type { Registrar } from '@libp2p/interface-internal'
import type { Uint8ArrayList } from 'uint8arraylist'

Expand All @@ -55,7 +55,7 @@ export interface PubSubComponents {
* PubSubBaseProtocol handles the peers and connections logic for pubsub routers
* and specifies the API that pubsub routers should have.
*/
export abstract class PubSubBaseProtocol<Events extends Record<string, any> = PubSubEvents> extends TypedEventEmitter<Events> implements PubSub<Events> {
export abstract class PubSubBaseProtocol<Events extends Record<string, any> = PubSubEvents, PeerEvents extends PeerStreamEvents = PeerStreamEvents> extends TypedEventEmitter<Events> implements PubSub<Events> {
protected log: Logger

public started: boolean
Expand All @@ -70,7 +70,7 @@ export abstract class PubSubBaseProtocol<Events extends Record<string, any> = Pu
/**
* Map of peer streams
*/
public peers: PeerMap<PeerStreams>
public peers: PeerMap<PeerStreams<PeerEvents>>
/**
* The signature policy to follow by default
*/
Expand Down Expand Up @@ -119,7 +119,7 @@ export abstract class PubSubBaseProtocol<Events extends Record<string, any> = Pu
this.started = false
this.topics = new Map()
this.subscriptions = new Set()
this.peers = new PeerMap<PeerStreams>()
this.peers = new PeerMap<PeerStreams<PeerEvents>>()
this.globalSignaturePolicy = globalSignaturePolicy === 'StrictNoSign' ? 'StrictNoSign' : 'StrictSign'
this.canRelayMessage = canRelayMessage
this.emitSelf = emitSelf
Expand Down Expand Up @@ -268,7 +268,7 @@ export abstract class PubSubBaseProtocol<Events extends Record<string, any> = Pu
/**
* Notifies the router that a peer has been connected
*/
addPeer (peerId: PeerId, protocol: string): PeerStreams {
addPeer (peerId: PeerId, protocol: string): PeerStreams<PeerEvents> {
const existing = this.peers.get(peerId)

// If peer streams already exists, do nothing
Expand All @@ -279,7 +279,7 @@ export abstract class PubSubBaseProtocol<Events extends Record<string, any> = Pu
// else create a new peer streams
this.log('new peer %p', peerId)

const peerStreams: PeerStreams = new PeerStreamsImpl(this.components, {
const peerStreams: PeerStreams<PeerEvents> = new PeerStreamsImpl<PeerEvents>(this.components, {
id: peerId,
protocol
})
Expand All @@ -295,7 +295,7 @@ export abstract class PubSubBaseProtocol<Events extends Record<string, any> = Pu
/**
* Notifies the router that a peer has been disconnected
*/
protected _removePeer (peerId: PeerId): PeerStreams | undefined {
protected _removePeer (peerId: PeerId): PeerStreams<PeerEvents> | undefined {
const peerStreams = this.peers.get(peerId)
if (peerStreams == null) {
return
Expand All @@ -321,7 +321,7 @@ export abstract class PubSubBaseProtocol<Events extends Record<string, any> = Pu
/**
* Responsible for processing each RPC message received by other peers.
*/
async processMessages (peerId: PeerId, stream: AsyncIterable<Uint8ArrayList>, peerStreams: PeerStreams): Promise<void> {
async processMessages (peerId: PeerId, stream: AsyncIterable<Uint8ArrayList>, peerStreams: PeerStreams<PeerEvents>): Promise<void> {
try {
await pipe(
stream,
Expand Down Expand Up @@ -369,7 +369,7 @@ export abstract class PubSubBaseProtocol<Events extends Record<string, any> = Pu
/**
* Handles an rpc request from a peer
*/
async processRpc (from: PeerId, peerStreams: PeerStreams, rpc: PubSubRPC): Promise<boolean> {
async processRpc (from: PeerId, peerStreams: PeerStreams<PeerEvents>, rpc: PubSubRPC): Promise<boolean> {
if (!this.acceptFrom(from)) {
this.log('received message from unacceptable peer %p', from)
return false
Expand Down
2 changes: 1 addition & 1 deletion packages/pubsub/src/peer-streams.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ export interface DecoderOptions extends LpDecoderOptions {
/**
* Thin wrapper around a peer's inbound / outbound pubsub streams
*/
export class PeerStreams extends TypedEventEmitter<PeerStreamEvents> {
export class PeerStreams<PeerEvents extends PeerStreamEvents = PeerStreamEvents> extends TypedEventEmitter<PeerEvents> {
public readonly id: PeerId
public readonly protocol: string
/**
Expand Down