forked from stellar/stellar-core
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathPendingEnvelopes.h
149 lines (118 loc) · 4.46 KB
/
PendingEnvelopes.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
#pragma once
#include "crypto/SecretKey.h"
#include "herder/Herder.h"
#include "herder/QuorumTracker.h"
#include "lib/json/json.h"
#include "lib/util/lrucache.hpp"
#include "overlay/ItemFetcher.h"
#include <autocheck/function.hpp>
#include <map>
#include <medida/medida.h>
#include <queue>
#include <set>
#include <util/optional.h>
/*
SCP messages that you have received but are waiting to get the info of
before feeding into SCP
*/
namespace stellar
{
class HerderImpl;
struct SlotEnvelopes
{
// list of envelopes we have processed already
std::vector<SCPEnvelope> mProcessedEnvelopes;
// list of envelopes we have discarded already
std::set<SCPEnvelope> mDiscardedEnvelopes;
// list of envelopes we are fetching right now
std::set<SCPEnvelope> mFetchingEnvelopes;
// list of ready envelopes that haven't been sent to SCP yet
std::vector<SCPEnvelope> mReadyEnvelopes;
};
class PendingEnvelopes
{
Application& mApp;
HerderImpl& mHerder;
// ledger# and list of envelopes in various states
std::map<uint64, SlotEnvelopes> mEnvelopes;
// all the quorum sets we have learned about
cache::lru_cache<Hash, SCPQuorumSetPtr> mQsetCache;
ItemFetcher mTxSetFetcher;
ItemFetcher mQuorumSetFetcher;
using TxSetFramCacheItem = std::pair<uint64, TxSetFramePtr>;
// all the txsets we have learned about per ledger#
cache::lru_cache<Hash, TxSetFramCacheItem> mTxSetCache;
bool mRebuildQuorum;
QuorumTracker mQuorumTracker;
medida::Counter& mProcessedCount;
medida::Counter& mDiscardedCount;
medida::Counter& mFetchingCount;
medida::Counter& mReadyCount;
// discards all SCP envelopes thats use QSet with given hash,
// as it is not sane QSet
void discardSCPEnvelopesWithQSet(Hash hash);
void updateMetrics();
public:
PendingEnvelopes(Application& app, HerderImpl& herder);
~PendingEnvelopes();
/**
* Process received @p envelope.
*
* Return status of received envelope.
*/
Herder::EnvelopeStatus recvSCPEnvelope(SCPEnvelope const& envelope);
/**
* Add @p qset identified by @p hash to local cache. Notifies
* @see ItemFetcher about that event - it may cause calls to Herder's
* recvSCPEnvelope which in turn may cause calls to @see recvSCPEnvelope
* in PendingEnvelopes.
*/
void addSCPQuorumSet(Hash hash, const SCPQuorumSet& qset);
/**
* Check if @p qset identified by @p hash was requested before from peers.
* If not, ignores that @p qset. If it was requested, calls
* @see addSCPQuorumSet.
*
* Return true if SCPQuorumSet is sane and useful (was asked for).
*/
bool recvSCPQuorumSet(Hash hash, const SCPQuorumSet& qset);
/**
* Add @p txset identified by @p hash to local cache. Notifies
* @see ItemFetcher about that event - it may cause calls to Herder's
* recvSCPEnvelope which in turn may cause calls to @see recvSCPEnvelope
* in PendingEnvelopes.
*/
void addTxSet(Hash hash, uint64 lastSeenSlotIndex, TxSetFramePtr txset);
/**
* Check if @p txset identified by @p hash was requested before from peers.
* If not, ignores that @p txset. If it was requested, calls
* @see addTxSet.
*
* Return true if TxSet useful (was asked for).
*/
bool recvTxSet(Hash hash, TxSetFramePtr txset);
void discardSCPEnvelope(SCPEnvelope const& envelope);
void peerDoesntHave(MessageType type, Hash const& itemID,
Peer::pointer peer);
bool isDiscarded(SCPEnvelope const& envelope) const;
bool isFullyFetched(SCPEnvelope const& envelope);
void startFetch(SCPEnvelope const& envelope);
void stopFetch(SCPEnvelope const& envelope);
void touchFetchCache(SCPEnvelope const& envelope);
void envelopeReady(SCPEnvelope const& envelope);
bool pop(uint64 slotIndex, SCPEnvelope& ret);
void eraseBelow(uint64 slotIndex);
void slotClosed(uint64 slotIndex);
std::vector<uint64> readySlots();
Json::Value getJsonInfo(size_t limit);
TxSetFramePtr getTxSet(Hash const& hash);
SCPQuorumSetPtr getQSet(Hash const& hash);
// returns true if we think that the node is in the transitive quorum for
// sure
bool isNodeDefinitelyInQuorum(NodeID const& node);
void rebuildQuorumTrackerState();
QuorumTracker::QuorumMap const& getCurrentlyTrackedQuorum() const;
// updates internal state when an envelope was succesfuly processed
void envelopeProcessed(SCPEnvelope const& env);
};
}