Skip to content

Commit

Permalink
Harvester refactor for async lookups
Browse files Browse the repository at this point in the history
  • Loading branch information
madMAx43v3r committed Nov 12, 2023
1 parent 081f419 commit 1dbbcee
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 69 deletions.
3 changes: 3 additions & 0 deletions include/mmx/Harvester.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class Harvester : public HarvesterBase {

void find_plot_dirs(const std::set<std::string>& dirs, std::set<std::string>& all_dirs) const;

// thread safe
void send_response( std::shared_ptr<const Challenge> request, std::shared_ptr<const mmx::chiapos::Proof> chia_proof,
const virtual_plot_info_t* virtual_plot, const hash_t& plot_id, const uint32_t score, const int64_t time_begin_ms) const;

Expand Down Expand Up @@ -86,6 +87,8 @@ class Harvester : public HarvesterBase {
std::shared_ptr<vnx::Timer> lookup_timer;
std::shared_ptr<vnx::addons::HttpInterface<Harvester>> http;

mutable std::mutex mutex;

friend class vnx::addons::HttpInterface<Harvester>;

};
Expand Down
125 changes: 56 additions & 69 deletions src/Harvester.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ void Harvester::main()
http = std::make_shared<vnx::addons::HttpInterface<Harvester>>(this, vnx_name);
add_async_client(http);

threads = std::make_shared<vnx::ThreadPool>(num_threads);
threads = std::make_shared<vnx::ThreadPool>(num_threads, num_threads);
lookup_timer = add_timer(std::bind(&Harvester::check_queue, this));

set_timer_millis(10000, std::bind(&Harvester::update, this));
Expand All @@ -98,6 +98,9 @@ void Harvester::send_response( std::shared_ptr<const Challenge> request, std::sh
out->harvester = host_name;
out->lookup_time_ms = vnx::get_wall_time_millis() - time_begin_ms;

const auto delay_sec = out->lookup_time_ms / 1e3;
log(INFO) << "Found proof with score " << score << " for height " << request->height << ", delay " << delay_sec << " sec";

vnx::optional<skey_t> local_sk;

if(chia_proof) {
Expand Down Expand Up @@ -129,12 +132,18 @@ void Harvester::send_response( std::shared_ptr<const Challenge> request, std::sh
proof->plot_key = virtual_plot->farmer_key;
out->proof = proof;
}
else {
return;
}

try {
out->hash = out->calc_hash();
out->content_hash = out->calc_hash(true);
// TODO: have node sign it after verify
out->farmer_sig = farmer->sign_proof(out, local_sk);
{
std::lock_guard<std::mutex> lock(mutex);
// TODO: have node sign it after verify
out->farmer_sig = farmer->sign_proof(out, local_sk);
}
out->content_hash = out->calc_hash(true);
publish(out, output_proofs);
}
Expand Down Expand Up @@ -182,46 +191,20 @@ void Harvester::handle(std::shared_ptr<const Challenge> value)

void Harvester::lookup_task(std::shared_ptr<const Challenge> value, const int64_t recv_time_ms) const
{
const auto time_begin = vnx::get_wall_time_millis();

std::mutex mutex;
uint256_t best_score = uint256_max;
std::atomic<uint64_t> num_passed {0};

for(const auto& entry : virtual_map)
{
threads->add_task([this, entry, value, time_begin, &best_score, &num_passed, &mutex]()
{
if(check_plot_filter(params, value->challenge, entry.first)) {
try {
std::lock_guard<std::mutex> lock(mutex);
const auto balance = node->get_virtual_plot_balance(entry.first, value->diff_block_hash);
if(balance) {
const auto score = calc_virtual_score(params, value->challenge, entry.first, balance, value->space_diff);
if(score < params->score_threshold) {
send_response(value, nullptr, &entry.second, entry.first, score, time_begin);
}
best_score = std::min(best_score, score);
}
} catch(const std::exception& ex) {
log(WARN) << "[" << host_name << "] Failed to check virtual plot: " << ex.what() << " (" << entry.first << ")";
}
num_passed++;
}
});
}
struct lookup_job_t {
size_t total_plots = 0;
std::atomic<uint64_t> num_left {0};
std::atomic<uint64_t> num_passed {0};
};
const auto job = std::make_shared<lookup_job_t>();
job->total_plots = id_map.size();
job->num_left = job->total_plots;

struct {
hash_t plot_id;
hash_t challenge;
uint32_t index = 0;
uint256_t score = uint256_max;
std::shared_ptr<chiapos::DiskProver> prover;
} best_entry;
const auto max_delay_sec = params->block_time * value->max_delay;

for(const auto& entry : id_map)
{
threads->add_task([this, entry, value, &best_entry, &best_score, &num_passed, &mutex]()
threads->add_task([this, entry, value, job, max_delay_sec, recv_time_ms]()
{
if(check_plot_filter(params, value->challenge, entry.first))
{
Expand All @@ -232,53 +215,57 @@ void Harvester::lookup_task(std::shared_ptr<const Challenge> value, const int64_
const auto challenge = get_plot_challenge(value->challenge, entry.first);
const auto qualities = prover->get_qualities(challenge.bytes);

std::lock_guard<std::mutex> lock(mutex);

for(size_t k = 0; k < qualities.size(); ++k) {
const auto score = calc_proof_score(params, prover->get_ksize(), hash_t::from_bytes(qualities[k]), value->space_diff);
if(score < params->score_threshold) {
if(score < best_entry.score) {
best_entry.index = k;
best_entry.score = score;
best_entry.plot_id = entry.first;
best_entry.challenge = challenge;
best_entry.prover = prover;
try {
const auto proof = prover->get_full_proof(challenge.bytes, k);
send_response(value, proof, nullptr, entry.first, score, recv_time_ms);
} catch(const std::exception& ex) {
log(WARN) << "[" << host_name << "] Failed to fetch full proof: " << ex.what() << " (" << prover->get_file_path() << ")";
}
}
best_score = std::min(best_score, score);
}
} catch(const std::exception& ex) {
log(WARN) << "[" << host_name << "] Failed to fetch qualities: " << ex.what() << " (" << prover->get_file_path() << ")";
}
}
num_passed++;
job->num_passed++;
}
if(job->num_left-- == 1) {
const auto delay_sec = (vnx::get_wall_time_millis() - recv_time_ms) / 1e3;
if(delay_sec > max_delay_sec) {
log(WARN) << "[" << host_name << "] Lookup for height "
<< value->height << " took longer than allowable delay: " << delay_sec << " sec";
}
log(INFO) << "[" << host_name << "] " << job->num_passed << " / " << job->total_plots
<< " plots were eligible for height " << value->height << ", delay " << delay_sec << " sec";
}
});
}
threads->sync();

if(auto prover = best_entry.prover) {
try {
const auto proof = prover->get_full_proof(best_entry.challenge.bytes, best_entry.index);
send_response(value, proof, nullptr, best_entry.plot_id, best_entry.score, time_begin);
} catch(const std::exception& ex) {
log(WARN) << "[" << host_name << "] Failed to fetch full proof: " << ex.what() << " (" << prover->get_file_path() << ")";
for(const auto& entry : virtual_map)
{
if(check_plot_filter(params, value->challenge, entry.first)) {
try {
const auto balance = node->get_virtual_plot_balance(entry.first, value->diff_block_hash);
if(balance) {
const auto score = calc_virtual_score(params, value->challenge, entry.first, balance, value->space_diff);
if(score < params->score_threshold) {
send_response(value, nullptr, &entry.second, entry.first, score, recv_time_ms);
}
}
} catch(const std::exception& ex) {
log(WARN) << "[" << host_name << "] Failed to check virtual plot: " << ex.what() << " (" << entry.first << ")";
}
}
}

const auto now_ms = vnx::get_wall_time_millis();
const auto time_sec = (now_ms - time_begin) / 1e3;
const auto delay_sec = (now_ms - recv_time_ms) / 1e3;

if(delay_sec > params->block_time * value->max_delay) {
log(WARN) << "[" << host_name << "] Lookup for height " << value->height << " took longer than allowable delay: " << delay_sec << " sec";
}
if(!id_map.empty()) {
log(INFO) << "[" << host_name << "] " << num_passed << " plots were eligible for height " << value->height
<< ", best score was " << (best_score != uint256_max ? best_score.str() : "N/A")
<< ", took " << time_sec << " sec, delay " << delay_sec << " sec";
const auto delay_sec = (vnx::get_wall_time_millis() - recv_time_ms) / 1e3;
if(delay_sec > max_delay_sec) {
log(WARN) << "[" << host_name << "] Virtual plots check for height " << value->height << " took longer than allowable delay: " << delay_sec << " sec";
}
lookup_timer->set_millis(10);

lookup_timer->set_millis(0);
}

uint64_t Harvester::get_total_bytes() const
Expand Down

0 comments on commit 1dbbcee

Please sign in to comment.