Skip to content

Commit 42b5b92

Browse files
maschadwemeetagain
andauthored
feat: graft/prune events and mesh peer tagging (#383)
* reset to master * feat: default to disabled tagging for graft / prune * chore: linting fixes + restore package-lock.json * refactor: place the tag function in sendGraft * fix: ensure tags are pruned / grafted per topic * test: added tagging tests to ensure value is properly set * test: added test to ensure peer store is cleared of tags * test: fix issue with flakey tagging test * chore: remove vulnerabilities from pacakge-lock.json * fix: handle error for peerStore merge * fix: PR feedback adjustments * refactor: make taggingMesh flag required * chore: clean up feature * chore: pr review --------- Co-authored-by: Cayman <[email protected]>
1 parent f255ae4 commit 42b5b92

File tree

2 files changed

+149
-25
lines changed

2 files changed

+149
-25
lines changed

src/index.ts

Lines changed: 88 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ import { removeFirstNItemsFromSet, removeItemsFromSet } from './utils/set.js'
6363
import { SimpleTimeCache } from './utils/time-cache.js'
6464
import type { GossipsubOptsSpec } from './config.js'
6565
import type {
66-
Connection, Stream, PeerId, Peer, PeerStore,
66+
Connection, Direction, Stream, PeerId, Peer, PeerStore,
6767
Message,
6868
PublishResult,
6969
PubSub,
@@ -189,6 +189,11 @@ export interface GossipsubOpts extends GossipsubOptsSpec, PubSubInit {
189189
* Limits to bound protobuf decoding
190190
*/
191191
decodeRpcLimits?: DecodeRPCLimits
192+
193+
/**
194+
* If true, will utilize the libp2p connection manager tagging system to prune/graft connections to peers, defaults to true
195+
*/
196+
tagMeshPeers: boolean
192197
}
193198

194199
export interface GossipsubMessage {
@@ -197,9 +202,17 @@ export interface GossipsubMessage {
197202
msg: Message
198203
}
199204

205+
export interface MeshPeer {
206+
peerId: string
207+
topic: string
208+
direction: Direction
209+
}
210+
200211
export interface GossipsubEvents extends PubSubEvents {
201212
'gossipsub:heartbeat': CustomEvent
202213
'gossipsub:message': CustomEvent<GossipsubMessage>
214+
'gossipsub:graft': CustomEvent<MeshPeer>
215+
'gossipsub:prune': CustomEvent<MeshPeer>
203216
}
204217

205218
enum GossipStatusCode {
@@ -408,6 +421,7 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
408421
fallbackToFloodsub: true,
409422
floodPublish: true,
410423
batchPublish: false,
424+
tagMeshPeers: true,
411425
doPX: false,
412426
directPeers: [],
413427
D: constants.GossipsubD,
@@ -635,6 +649,11 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
635649
})
636650
}, constants.GossipsubDirectConnectInitialDelay)
637651

652+
if (this.opts.tagMeshPeers) {
653+
this.addEventListener('gossipsub:graft', this.tagMeshPeer)
654+
this.addEventListener('gossipsub:prune', this.untagMeshPeer)
655+
}
656+
638657
this.log('started')
639658
}
640659

@@ -652,6 +671,11 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
652671
const { registrarTopologyIds } = this.status
653672
this.status = { code: GossipStatusCode.stopped }
654673

674+
if (this.opts.tagMeshPeers) {
675+
this.removeEventListener('gossipsub:graft', this.tagMeshPeer)
676+
this.removeEventListener('gossipsub:prune', this.untagMeshPeer)
677+
}
678+
655679
// unregister protocol and handlers
656680
const registrar = this.components.registrar
657681
await Promise.all(this.multicodecs.map(async (multicodec) => registrar.unhandle(multicodec)))
@@ -1507,6 +1531,7 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
15071531
if (topicID == null) {
15081532
return
15091533
}
1534+
15101535
const peersInMesh = this.mesh.get(topicID)
15111536
if (peersInMesh == null) {
15121537
// don't do PX when there is an unknown topic to avoid leaking our peers
@@ -1520,38 +1545,38 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
15201545
return
15211546
}
15221547

1548+
const backoffExpiry = this.backoff.get(topicID)?.get(id)
1549+
1550+
// This if/else chain contains the various cases of valid (and semi-valid) GRAFTs
1551+
// Most of these cases result in a PRUNE immediately being sent in response
1552+
15231553
// we don't GRAFT to/from direct peers; complain loudly if this happens
15241554
if (this.direct.has(id)) {
15251555
this.log('GRAFT: ignoring request from direct peer %s', id)
15261556
// this is possibly a bug from a non-reciprical configuration; send a PRUNE
15271557
prune.push(topicID)
15281558
// but don't px
15291559
doPX = false
1530-
return
1531-
}
15321560

1533-
// make sure we are not backing off that peer
1534-
const expire = this.backoff.get(topicID)?.get(id)
1535-
if (typeof expire === 'number' && now < expire) {
1561+
// make sure we are not backing off that peer
1562+
} else if (typeof backoffExpiry === 'number' && now < backoffExpiry) {
15361563
this.log('GRAFT: ignoring backed off peer %s', id)
15371564
// add behavioral penalty
15381565
this.score.addPenalty(id, 1, ScorePenalty.GraftBackoff)
15391566
// no PX
15401567
doPX = false
15411568
// check the flood cutoff -- is the GRAFT coming too fast?
1542-
const floodCutoff = expire + this.opts.graftFloodThreshold - this.opts.pruneBackoff
1569+
const floodCutoff = backoffExpiry + this.opts.graftFloodThreshold - this.opts.pruneBackoff
15431570
if (now < floodCutoff) {
15441571
// extra penalty
15451572
this.score.addPenalty(id, 1, ScorePenalty.GraftBackoff)
15461573
}
15471574
// refresh the backoff
15481575
this.addBackoff(id, topicID)
15491576
prune.push(topicID)
1550-
return
1551-
}
15521577

1553-
// check the score
1554-
if (score < 0) {
1578+
// check the score
1579+
} else if (score < 0) {
15551580
// we don't GRAFT peers with negative score
15561581
this.log('GRAFT: ignoring peer %s with negative score: score=%d, topic=%s', id, score, topicID)
15571582
// we do send them PRUNE however, because it's a matter of protocol correctness
@@ -1560,23 +1585,24 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
15601585
doPX = false
15611586
// add/refresh backoff so that we don't reGRAFT too early even if the score decays
15621587
this.addBackoff(id, topicID)
1563-
return
1564-
}
15651588

1566-
// check the number of mesh peers; if it is at (or over) Dhi, we only accept grafts
1567-
// from peers with outbound connections; this is a defensive check to restrict potential
1568-
// mesh takeover attacks combined with love bombing
1569-
if (peersInMesh.size >= this.opts.Dhi && !(this.outbound.get(id) ?? false)) {
1589+
// check the number of mesh peers; if it is at (or over) Dhi, we only accept grafts
1590+
// from peers with outbound connections; this is a defensive check to restrict potential
1591+
// mesh takeover attacks combined with love bombing
1592+
} else if (peersInMesh.size >= this.opts.Dhi && !(this.outbound.get(id) ?? false)) {
15701593
prune.push(topicID)
15711594
this.addBackoff(id, topicID)
1572-
return
1573-
}
15741595

1575-
this.log('GRAFT: Add mesh link from %s in %s', id, topicID)
1576-
this.score.graft(id, topicID)
1577-
peersInMesh.add(id)
1596+
// valid graft
1597+
} else {
1598+
this.log('GRAFT: Add mesh link from %s in %s', id, topicID)
1599+
this.score.graft(id, topicID)
1600+
peersInMesh.add(id)
1601+
1602+
this.metrics?.onAddToMesh(topicID, InclusionReason.Subscribed, 1)
1603+
}
15781604

1579-
this.metrics?.onAddToMesh(topicID, InclusionReason.Subscribed, 1)
1605+
this.safeDispatchEvent<MeshPeer>('gossipsub:graft', { detail: { peerId: id, topic: topicID, direction: 'inbound' } })
15801606
})
15811607

15821608
if (prune.length === 0) {
@@ -1627,10 +1653,12 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
16271653
score,
16281654
topicID
16291655
)
1630-
continue
1656+
} else {
1657+
await this.pxConnect(peers)
16311658
}
1632-
await this.pxConnect(peers)
16331659
}
1660+
1661+
this.safeDispatchEvent<MeshPeer>('gossipsub:prune', { detail: { peerId: id, topic: topicID, direction: 'inbound' } })
16341662
}
16351663
}
16361664

@@ -2325,6 +2353,21 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
23252353

23262354
this.metrics?.onRpcSent(rpc, rpcBytes.length)
23272355

2356+
if (rpc.control?.graft != null) {
2357+
for (const topic of rpc.control?.graft) {
2358+
if (topic.topicID != null) {
2359+
this.safeDispatchEvent<MeshPeer>('gossipsub:graft', { detail: { peerId: id, topic: topic.topicID, direction: 'outbound' } })
2360+
}
2361+
}
2362+
}
2363+
if (rpc.control?.prune != null) {
2364+
for (const topic of rpc.control?.prune) {
2365+
if (topic.topicID != null) {
2366+
this.safeDispatchEvent<MeshPeer>('gossipsub:prune', { detail: { peerId: id, topic: topic.topicID, direction: 'outbound' } })
2367+
}
2368+
}
2369+
}
2370+
23282371
return true
23292372
}
23302373

@@ -3023,6 +3066,26 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
30233066

30243067
metrics.registerScoreWeights(sw)
30253068
}
3069+
3070+
private readonly tagMeshPeer = (evt: CustomEvent<MeshPeer>): void => {
3071+
const { peerId, topic } = evt.detail
3072+
this.components.peerStore.merge(peerIdFromString(peerId), {
3073+
tags: {
3074+
[topic]: {
3075+
value: 100
3076+
}
3077+
}
3078+
}).catch((err) => { this.log.error('Error tagging peer %s with topic %s', peerId, topic, err) })
3079+
}
3080+
3081+
private readonly untagMeshPeer = (evt: CustomEvent<MeshPeer>): void => {
3082+
const { peerId, topic } = evt.detail
3083+
this.components.peerStore.merge(peerIdFromString(peerId), {
3084+
tags: {
3085+
[topic]: undefined
3086+
}
3087+
}).catch((err) => { this.log.error('Error untagging peer %s with topic %s', peerId, topic, err) })
3088+
}
30263089
}
30273090

30283091
export function gossipsub (

test/gossip.spec.ts

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,67 @@ describe('gossip', () => {
9999
expect(publishResult.recipients).to.deep.equal([])
100100
})
101101

102+
it('should tag peers', async function () {
103+
this.timeout(10e4)
104+
const nodeA = nodes[0]
105+
const nodeB = nodes[1]
106+
const topic = 'Z'
107+
108+
const twoNodes = [nodeA, nodeB]
109+
110+
const graftPromises = twoNodes.map(async (n) => pEvent(n.pubsub, 'gossipsub:graft'))
111+
112+
// add subscriptions to each node
113+
twoNodes.forEach((n) => { n.pubsub.subscribe(topic) })
114+
115+
// every node connected to every other
116+
await connectAllPubSubNodes(twoNodes)
117+
118+
// await grafts
119+
await Promise.all(graftPromises)
120+
121+
// await mesh rebalancing
122+
await Promise.all(twoNodes.map(async (n) => pEvent(n.pubsub, 'gossipsub:heartbeat')))
123+
124+
const peerInfoA = await nodeA.components.peerStore.get(nodeB.components.peerId).catch((e) => undefined)
125+
const peerInfoB = await nodeB.components.peerStore.get(nodeA.components.peerId).catch((e) => undefined)
126+
expect(peerInfoA?.tags.get(topic)?.value).to.equal(100)
127+
expect(peerInfoB?.tags.get(topic)?.value).to.equal(100)
128+
})
129+
130+
it('should remove the tags upon pruning', async function () {
131+
this.timeout(10e4)
132+
const nodeA = nodes[0]
133+
const nodeB = nodes[1]
134+
const topic = 'Z'
135+
136+
const twoNodes = [nodeA, nodeB]
137+
138+
const subscriptionPromises = nodes.map(async (n) => pEvent(n.pubsub, 'subscription-change'))
139+
// add subscriptions to each node
140+
twoNodes.forEach((n) => { n.pubsub.subscribe(topic) })
141+
142+
// every node connected to every other
143+
await connectAllPubSubNodes(nodes)
144+
145+
// await for subscriptions to be transmitted
146+
await Promise.all(subscriptionPromises)
147+
148+
// await mesh rebalancing
149+
await Promise.all(twoNodes.map(async (n) => pEvent(n.pubsub, 'gossipsub:heartbeat')))
150+
151+
twoNodes.forEach((n) => { n.pubsub.unsubscribe(topic) })
152+
153+
// await for unsubscriptions to be transmitted
154+
// await mesh rebalancing
155+
await Promise.all(twoNodes.map(async (n) => pEvent(n.pubsub, 'gossipsub:heartbeat')))
156+
157+
const peerInfoA = await nodeA.components.peerStore.get(nodeB.components.peerId).catch((e) => undefined)
158+
const peerInfoB = await nodeB.components.peerStore.get(nodeA.components.peerId).catch((e) => undefined)
159+
expect(peerInfoA?.tags.get(topic)).to.be.undefined()
160+
expect(peerInfoB?.tags.get(topic)).to.be.undefined()
161+
})
162+
102163
it('should reject incoming messages bigger than maxInboundDataLength limit', async function () {
103164
this.timeout(10e4)
104165
const nodeA = nodes[0]

0 commit comments

Comments
 (0)