Skip to content

Commit 94009f9

Browse files
committed
[DHT] Rework hash table status RPC command - WIP
1 parent a8d0442 commit 94009f9

File tree

3 files changed

+106
-122
lines changed

3 files changed

+106
-122
lines changed

src/dht/rpcdht.cpp

+14-69
Original file line numberDiff line numberDiff line change
@@ -214,78 +214,23 @@ static UniValue GetDHTStatus(const JSONRPCRequest& request)
214214
if (!DHT::SessionStatus())
215215
throw JSONRPCError(RPC_DHT_NOT_STARTED, strprintf("dht %s failed. DHT session not started.", request.params[0].get_str()));
216216

217-
libtorrent::session_status stats;
218-
std::vector<libtorrent::dht_lookup> vchDHTLookup;
219-
std::vector<libtorrent::dht_routing_bucket> vchDHTBuckets;
220-
DHT::GetDHTStats(0, stats, vchDHTLookup, vchDHTBuckets);
221-
217+
CSessionStats stats;
218+
DHT::GetDHTStats(stats);
222219
UniValue result(UniValue::VOBJ);
223-
result.push_back(Pair("num_peers", stats.num_peers));
224-
result.push_back(Pair("peerlist_size", stats.peerlist_size));
225-
result.push_back(Pair("active_request_size", (int)stats.active_requests.size()));
226-
result.push_back(Pair("dht_node_cache", stats.dht_node_cache));
227-
result.push_back(Pair("dht_global_nodes", stats.dht_global_nodes));
228-
result.push_back(Pair("dht_download_rate", stats.dht_download_rate));
229-
result.push_back(Pair("dht_upload_rate", stats.dht_upload_rate));
230-
result.push_back(Pair("dht_total_allocations", stats.dht_total_allocations));
231-
result.push_back(Pair("download_rate", stats.download_rate));
232-
result.push_back(Pair("upload_rate", stats.upload_rate));
233-
result.push_back(Pair("total_download", stats.total_download));
234-
result.push_back(Pair("total_upload", stats.total_upload));
235-
result.push_back(Pair("total_dht_download", stats.total_dht_download));
236-
result.push_back(Pair("total_dht_upload", stats.total_dht_upload));
237-
result.push_back(Pair("total_ip_overhead_download", stats.total_ip_overhead_download));
238-
result.push_back(Pair("total_ip_overhead_upload", stats.total_ip_overhead_upload));
239-
result.push_back(Pair("total_payload_download", stats.total_payload_download));
240-
result.push_back(Pair("total_payload_upload", stats.total_payload_upload));
241-
result.push_back(Pair("dht_nodes", stats.dht_nodes));
242-
result.push_back(Pair("dht_torrents", stats.dht_torrents));
243-
244-
for (const libtorrent::dht_routing_bucket& bucket : vchDHTBuckets){
245-
UniValue oBucket(UniValue::VOBJ);
246-
oBucket.push_back(Pair("num_nodes", bucket.num_nodes));
247-
oBucket.push_back(Pair("num_replacements", bucket.num_replacements));
248-
oBucket.push_back(Pair("last_active", bucket.last_active));
249-
result.push_back(Pair("bucket", oBucket));
220+
result.push_back(Pair("num_sessions", stats.nSessions));
221+
result.push_back(Pair("put_records", stats.nPutRecords));
222+
result.push_back(Pair("put_pieces", stats.nPutPieces));
223+
result.push_back(Pair("put_bytes", stats.nPutBytes));
224+
result.push_back(Pair("get_records", stats.nGetRecords));
225+
result.push_back(Pair("get_pieces", stats.nGetPieces));
226+
result.push_back(Pair("get_bytes", stats.nGetBytes));
227+
result.push_back(Pair("get_errors", stats.nGetErrors));
228+
229+
for (const std::pair<std::string, std::string>& pairMessage : stats.vMessages)
230+
{
231+
result.push_back(Pair(pairMessage.first, pairMessage.second));
250232
}
251233

252-
for (const libtorrent::dht_lookup& lookup : vchDHTLookup) {
253-
UniValue oLookup(UniValue::VOBJ);
254-
oLookup.push_back(Pair("outstanding_requests", lookup.outstanding_requests));
255-
oLookup.push_back(Pair("timeouts", lookup.timeouts));
256-
oLookup.push_back(Pair("responses", lookup.responses));
257-
oLookup.push_back(Pair("branch_factor", lookup.branch_factor));
258-
oLookup.push_back(Pair("nodes_left", lookup.nodes_left));
259-
oLookup.push_back(Pair("last_sent", lookup.last_sent));
260-
oLookup.push_back(Pair("first_timeout", lookup.first_timeout));
261-
// string literal indicating which kind of lookup this is
262-
// char const* type;
263-
// the node-id or info-hash target for this lookup
264-
//sha1_hash target;
265-
result.push_back(oLookup);
266-
result.push_back(Pair("lookup", oLookup));
267-
}
268-
/*
269-
result.push_back(Pair("ip_overhead_download_rate", stats.ip_overhead_download_rate));
270-
result.push_back(Pair("ip_overhead_upload_rate", stats.ip_overhead_upload_rate));
271-
result.push_back(Pair("payload_download_rate", stats.payload_download_rate));
272-
result.push_back(Pair("payload_upload_rate", stats.payload_upload_rate));
273-
result.push_back(Pair("tracker_upload_rate", stats.tracker_upload_rate));
274-
result.push_back(Pair("tracker_download_rate", stats.tracker_download_rate));
275-
result.push_back(Pair("total_tracker_download", stats.total_tracker_download));
276-
result.push_back(Pair("total_tracker_upload", stats.total_tracker_upload));
277-
result.push_back(Pair("total_redundant_bytes", stats.total_redundant_bytes));
278-
result.push_back(Pair("total_failed_bytes", stats.total_failed_bytes));
279-
result.push_back(Pair("num_unchoked", stats.num_unchoked));
280-
result.push_back(Pair("allowed_upload_slots", stats.allowed_upload_slots));
281-
result.push_back(Pair("up_bandwidth_queue", stats.up_bandwidth_queue));
282-
result.push_back(Pair("down_bandwidth_queue", stats.down_bandwidth_queue));
283-
result.push_back(Pair("up_bandwidth_bytes_queue", stats.up_bandwidth_bytes_queue));
284-
result.push_back(Pair("down_bandwidth_bytes_queue", stats.down_bandwidth_bytes_queue));
285-
result.push_back(Pair("optimistic_unchoke_counter", stats.optimistic_unchoke_counter));
286-
result.push_back(Pair("unchoke_counter", stats.unchoke_counter));
287-
result.push_back(Pair("has_incoming_connections", stats.has_incoming_connections));
288-
*/
289234
return result;
290235
}
291236

src/dht/session.cpp

+69-48
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,13 @@
2323

2424
#include "libtorrent/alert_types.hpp"
2525
#include "libtorrent/bencode.hpp" // for bencode()
26-
#include <libtorrent/hex.hpp> // for to_hex
26+
#include "libtorrent/hex.hpp" // for to_hex
2727
#include "libtorrent/kademlia/ed25519.hpp"
28-
#include <libtorrent/kademlia/item.hpp> // for sign_mutable_item
28+
#include "libtorrent/kademlia/item.hpp" // for sign_mutable_item
29+
#include "libtorrent/session_stats.hpp"
2930
#include "libtorrent/span.hpp"
3031

32+
#include <boost/algorithm/string.hpp>
3133
#include <boost/filesystem.hpp>
3234
#include <boost/thread/thread.hpp>
3335

@@ -53,6 +55,12 @@ static std::shared_ptr<std::thread> pDHTTorrentThread;
5355
static std::shared_ptr<boost::thread> pReannounceThread = nullptr;
5456
static std::map<HashRecordKey, uint32_t> mPutCommands;
5557
static uint64_t nPutRecords = 0;
58+
static uint64_t nPutPieces = 0;
59+
static uint64_t nPutBytes = 0;
60+
static uint64_t nGetRecords = 0;
61+
static uint64_t nGetPieces = 0;
62+
static uint64_t nGetBytes = 0;
63+
static uint64_t nGetErrors = 0;
5664

5765
static bool fStarted;
5866
static bool fReannounceStarted = false;
@@ -160,13 +168,15 @@ void StartEventListener(std::shared_ptr<CHashTableSession> dhtSession)
160168
dhtSession->AddToDHTGetEventMap(infoHash, event);
161169
}
162170
}
163-
}
164-
else if (iAlertType == DHT_STATS_ALERT_TYPE_CODE) {
165-
// TODO (dht): handle stats
166-
//dht_stats_alert* pAlert = alert_cast<dht_stats_alert>((*iAlert));
167-
LogPrintf("%s -- DHT Alert Message: AlertType = %s\n", __func__, strAlertTypeName);
168-
}
169-
else {
171+
} else if (iAlertType == DHT_STATS_ALERT_TYPE_CODE) {
172+
LogPrintf("%s -- DHT Status Alert Message: AlertType = %s\n", __func__, strAlertTypeName);
173+
dht_stats_alert* pAlert = alert_cast<dht_stats_alert>((*iAlert));
174+
dhtSession->DHTStats = pAlert;
175+
} else if (iAlertType == STATS_ALERT_TYPE_CODE) {
176+
LogPrintf("%s -- Status Alert Message: AlertType = %s\n", __func__, strAlertTypeName);
177+
session_stats_alert* pAlert = alert_cast<session_stats_alert>((*iAlert));
178+
dhtSession->SessionStats = pAlert;
179+
} else {
170180
const CEvent event(strAlertMessage, iAlertType, iAlertCategory, strAlertTypeName);
171181
dhtSession->AddToEventMap(iAlertType, event);
172182
}
@@ -452,38 +462,6 @@ void StopTorrentDHTNetwork()
452462
LogPrintf("%s --Finished stopping all DHT session threads.\n", __func__);
453463
}
454464

455-
void CHashTableSession::GetDHTStats(session_status& stats, std::vector<dht_lookup>& vchDHTLookup, std::vector<dht_routing_bucket>& vchDHTBuckets)
456-
{
457-
LogPrint("dht", "DHTTorrentNetwork -- GetDHTStats started.\n");
458-
459-
if (!Session) {
460-
return;
461-
}
462-
463-
if (!Session->is_dht_running()) {
464-
return;
465-
//LogPrint("dht", "DHTTorrentNetwork -- GetDHTStats Restarting DHT.\n");
466-
//if (!LoadSessionState(Session)) {
467-
// LogPrint("dht", "DHTTorrentNetwork -- GetDHTStats Couldn't load previous settings. Trying to bootstrap again.\n");
468-
// Bootstrap();
469-
//}
470-
//else {
471-
// LogPrint("dht", "DHTTorrentNetwork -- GetDHTStats setting loaded from file.\n");
472-
//}
473-
}
474-
else {
475-
LogPrint("dht", "DHTTorrentNetwork -- GetDHTStats DHT already running. Bootstrap not needed.\n");
476-
}
477-
478-
Session->post_dht_stats();
479-
//get alert from map
480-
//alert* dhtAlert = WaitForResponse(Session, dht_stats_alert::alert_type);
481-
//dht_stats_alert* dhtStatsAlert = alert_cast<dht_stats_alert>(dhtAlert);
482-
//vchDHTLookup = dhtStatsAlert->active_requests;
483-
//vchDHTBuckets = dhtStatsAlert->routing_table;
484-
stats = Session->status();
485-
}
486-
487465
void CleanUpPutCommandMap()
488466
{
489467
int64_t nCurrentTime = GetTime();
@@ -614,8 +592,11 @@ bool CHashTableSession::SubmitGetRecord(const std::array<char, 32>& public_key,
614592
CDataRecord getRecord(strOperationType, nTotalSlots, header, vChunks, Array32ToVector(private_seed));
615593
if (record.HasError()) {
616594
strErrorMessage = strprintf("Record has errors: %s\n", __func__, getRecord.ErrorMessage());
595+
nGetErrors++;
617596
return false;
618597
}
598+
nGetPieces += header.nChunks + 1;
599+
nGetBytes += header.nDataSize;
619600
record = getRecord;
620601
return true;
621602
}
@@ -886,11 +867,13 @@ bool SubmitPut(const std::array<char, 32> public_key, const std::array<char, 64>
886867
return false;
887868
}
888869
arraySessions[nCounter].second->SubmitPut(public_key, private_key, lastSequence, pair.first, pair.second);
889-
LogPrintf("%s -- salt: %s, value: %s\n", __func__, pair.first, pair.second.to_string());
870+
LogPrintf("%s -- thread: %d, salt: %s, value: %s\n", __func__, nCounter, pair.first, pair.second.to_string());
890871
if (fMultiThreads)
891872
nCounter++;
892873
}
893874
nPutRecords++;
875+
nPutPieces += record.GetHeader().nChunks + 1;
876+
nPutBytes += record.GetHeader().nDataSize + record.GetHeader().HexValue().size();
894877

895878
if (nPutRecords % 32 == 0)
896879
CleanUpPutCommandMap();
@@ -933,6 +916,7 @@ bool SubmitGetRecord(const size_t nSessionThread, const std::array<char, 32>& pu
933916
if (!arraySessions[nSessionThread].second)
934917
return false;
935918

919+
nGetRecords++;
936920
return arraySessions[nSessionThread].second->SubmitGetRecord(public_key, private_seed, strOperationType, iSequence, record);
937921
}
938922

@@ -969,15 +953,52 @@ bool GetAllDHTGetEvents(const size_t nSessionThread, std::vector<CMutableGetEven
969953
return arraySessions[nSessionThread].second->GetAllDHTGetEvents(vchGetEvents);
970954
}
971955

972-
void GetDHTStats(const size_t nSessionThread, libtorrent::session_status& stats, std::vector<libtorrent::dht_lookup>& vchDHTLookup, std::vector<libtorrent::dht_routing_bucket>& vchDHTBuckets)
956+
void GetDHTStats(CSessionStats& stats)
973957
{
974-
if (nSessionThread >= nThreads)
975-
return;
958+
CSessionStats newStats;
959+
size_t nRunningThreads = fMultiThreads ? nThreads : 1;
960+
for (unsigned int i = 0; i < nRunningThreads; i++) {
961+
if (!arraySessions[i].second || !arraySessions[i].second->Session)
962+
return;
963+
//arraySessions[i].second->Session->post_dht_stats();
964+
arraySessions[i].second->Session->post_session_stats();
965+
}
966+
// test get
967+
MilliSleep(333);
976968

977-
if (!arraySessions[nSessionThread].second)
978-
return;
969+
std::vector<libtorrent::stats_metric> vStats = session_stats_metrics();
970+
newStats.nSessions = nRunningThreads;
971+
for (unsigned int i = 0; i < nRunningThreads; i++) {
972+
libtorrent::session_stats_alert* statsAlert = arraySessions[i].second->SessionStats;
973+
if (statsAlert) {
974+
std::string strMessage = statsAlert->message();
975+
std::vector<std::string> vSplit1;
976+
boost::split(vSplit1, strMessage, boost::is_any_of(":"));
977+
if (vSplit1.size() > 1) {
978+
unsigned int x = 0;
979+
std::vector<std::string> vSplit2;
980+
boost::split(vSplit2, vSplit1[1], boost::is_any_of(","));
981+
for (const std::string& strValue : vSplit2) {
982+
if (std::string(vStats[x].name).find("dht.") == 0) {
983+
std::string strThreadName = "thread[" + std::to_string(i + 1) + "]";
984+
newStats.vMessages.push_back(std::make_pair(strThreadName + std::string(vStats[x].name), strValue));
985+
}
986+
x++;
987+
}
988+
}
989+
}
990+
}
991+
992+
newStats.nPutRecords = nPutRecords;
993+
newStats.nPutPieces = nPutPieces;
994+
newStats.nPutBytes = nPutBytes;
995+
newStats.nGetRecords = nGetRecords;
996+
newStats.nGetPieces = nGetPieces;
997+
newStats.nGetBytes = nGetBytes;
998+
newStats.nGetErrors = nGetErrors;
979999

980-
arraySessions[nSessionThread].second->GetDHTStats(stats, vchDHTLookup, vchDHTBuckets);
1000+
// get dht_global_nodes
1001+
stats = newStats;
9811002
}
9821003

9831004
} // end DHT namespace

src/dht/session.h

+23-5
Original file line numberDiff line numberDiff line change
@@ -39,25 +39,44 @@ static constexpr int DHT_GET_ALERT_TYPE_CODE = 75;
3939
static constexpr int DHT_PUT_ALERT_TYPE_CODE = 76;
4040
static constexpr int DHT_BOOTSTRAP_ALERT_TYPE_CODE = 62;
4141
static constexpr int DHT_STATS_ALERT_TYPE_CODE = 83;
42+
static constexpr int STATS_ALERT_TYPE_CODE = 70;
4243
static constexpr int DHT_ERROR_ALERT_TYPE_CODE = 73;
4344
static constexpr int64_t DHT_RECORD_LOCK_SECONDS = 16;
4445
static constexpr uint32_t DHT_KEEP_PUT_BUFFER_SECONDS = 300;
4546

4647
typedef std::pair<std::array<char, 32>, std::string> HashRecordKey; // public key and salt pair
4748

49+
class CSessionStats {
50+
public:
51+
uint8_t nSessions = 0;
52+
std::vector<std::pair<std::string, std::string>> vMessages;
53+
uint64_t nPutRecords = 0;
54+
uint64_t nPutPieces = 0;
55+
uint64_t nPutBytes = 0;
56+
uint64_t nGetRecords = 0;
57+
uint64_t nGetPieces = 0;
58+
uint64_t nGetBytes = 0;
59+
uint64_t nGlobalNodes = 0;
60+
uint64_t nGetErrors = 0;
61+
62+
CSessionStats() {}
63+
};
64+
4865
class CHashTableSession {
4966
public:
5067
std::string strName;
5168
CDataRecordBuffer vDataEntries;
52-
libtorrent::session* Session = NULL;
69+
libtorrent::session* Session = nullptr;
5370
std::string strErrorMessage;
54-
bool fShutdown;
71+
bool fShutdown = false;
5572
EventTypeMap m_EventTypeMap;
5673
DHTGetEventMap m_DHTGetEventMap;
74+
libtorrent::dht_stats_alert* DHTStats = nullptr;
75+
libtorrent::session_stats_alert* SessionStats = nullptr;
5776
CCriticalSection cs_EventMap;
5877
CCriticalSection cs_DHTGetEventMap;
5978

60-
CHashTableSession() : vDataEntries(CDataRecordBuffer(32)), strErrorMessage(""), fShutdown(false) {};
79+
CHashTableSession() : strName(""), vDataEntries(CDataRecordBuffer(32)), strErrorMessage(""), fShutdown(false) {};
6180

6281
bool SubmitPut(const std::array<char, 32> public_key, const std::array<char, 64> private_key, const int64_t lastSequence, const std::string& strSalt, const libtorrent::entry& entryValue);
6382

@@ -68,7 +87,6 @@ class CHashTableSession {
6887
bool SubmitGetRecord(const std::array<char, 32>& public_key, const std::array<char, 32>& private_seed, const std::string& strOperationType, int64_t& iSequence, CDataRecord& record);
6988
bool SubmitGetAllRecordsAsync(const std::vector<CLinkInfo>& vchLinkInfo, const std::string& strOperationType, std::vector<CDataRecord>& vchRecords);
7089
bool SubmitGetAllRecordsSync(const std::vector<CLinkInfo>& vchLinkInfo, const std::string& strOperationType, std::vector<CDataRecord>& vchRecords);
71-
void GetDHTStats(libtorrent::session_status& stats, std::vector<libtorrent::dht_lookup>& vchDHTLookup, std::vector<libtorrent::dht_routing_bucket>& vchDHTBuckets);
7290
bool Bootstrap();
7391
bool GetAllDHTGetEvents(std::vector<CMutableGetEvent>& vchGetEvents);
7492
void AddToDHTGetEventMap(const std::string& infoHash, const CMutableGetEvent& event);
@@ -110,7 +128,7 @@ namespace DHT
110128
bool SubmitGetAllRecordsSync(const size_t nSessionThread, const std::vector<CLinkInfo>& vchLinkInfo, const std::string& strOperationType, std::vector<CDataRecord>& vchRecords);
111129
bool SubmitGetAllRecordsAsync(const size_t nSessionThread, const std::vector<CLinkInfo>& vchLinkInfo, const std::string& strOperationType, std::vector<CDataRecord>& vchRecords);
112130
bool GetAllDHTGetEvents(const size_t nSessionThread, std::vector<CMutableGetEvent>& vchGetEvents);
113-
void GetDHTStats(const size_t nSessionThread, libtorrent::session_status& stats, std::vector<libtorrent::dht_lookup>& vchDHTLookup, std::vector<libtorrent::dht_routing_bucket>& vchDHTBuckets);
131+
void GetDHTStats(CSessionStats& stats);
114132
}
115133

116134
#endif // DYNAMIC_DHT_SESSION_H

0 commit comments

Comments
 (0)