From 60ab14d328579d0225fa7822ecbe212e587583ff Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Thu, 9 Apr 2020 14:02:28 +1200 Subject: [PATCH] Use `async-http-faraday`. --- Gemfile.lock | 31 +- github_changelog_generator.gemspec | 1 + .../generator/generator_fetcher.rb | 7 +- .../octo_fetcher.rb | 275 ++++++++++-------- spec/unit/generator/entry_spec.rb | 10 +- 5 files changed, 191 insertions(+), 133 deletions(-) diff --git a/Gemfile.lock b/Gemfile.lock index 3c193a6a4..cdd92de8b 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -3,6 +3,7 @@ PATH specs: github_changelog_generator (1.15.1) activesupport + async-http-faraday faraday-http-cache multi_json octokit (~> 4.6) @@ -22,19 +23,38 @@ GEM addressable (2.7.0) public_suffix (>= 2.0.2, < 5.0) ast (2.4.0) + async (1.24.2) + console (~> 1.0) + nio4r (~> 2.3) + timers (~> 4.1) + async-http (0.51.2) + async (~> 1.23) + async-io (~> 1.28) + async-pool (~> 0.2) + protocol-http (~> 0.18.0) + protocol-http1 (~> 0.12.0) + protocol-http2 (~> 0.13.0) + async-http-faraday (0.9.0) + async-http (~> 0.42) + faraday + async-io (1.28.0) + async (~> 1.14) + async-pool (0.2.0) + async (~> 1.8) backports (3.17.1) bump (0.9.0) childprocess (3.0.0) codeclimate-test-reporter (1.0.7) simplecov concurrent-ruby (1.1.6) + console (1.8.2) crack (0.4.3) safe_yaml (~> 1.0.0) diff-lcs (1.3) docile (1.3.2) faraday (0.17.3) multipart-post (>= 1.2, < 3) - faraday-http-cache (2.0.0) + faraday-http-cache (2.1.0) faraday (~> 0.8) hashdiff (1.0.1) i18n (1.8.2) @@ -45,6 +65,7 @@ GEM minitest (5.14.0) multi_json (1.14.1) multipart-post (2.1.1) + nio4r (2.5.2) octokit (4.18.0) faraday (>= 0.9) sawyer (~> 0.8.0, >= 0.5.3) @@ -54,6 +75,13 @@ GEM parallel (1.19.1) parser (2.7.1.0) ast (~> 2.4.0) + protocol-hpack (1.4.2) + protocol-http (0.18.0) + protocol-http1 (0.12.0) + protocol-http (~> 0.18) + protocol-http2 (0.13.3) + protocol-hpack (~> 1.4) + protocol-http (~> 0.18) public_suffix (4.0.4) rainbow (3.0.0) rake (13.0.1) @@ -92,6 +120,7 @@ GEM simplecov-html (~> 0.11) simplecov-html (0.12.2) thread_safe (0.3.6) + timers (4.3.0) tty-color (0.5.1) tzinfo (1.2.7) thread_safe (~> 0.1) diff --git a/github_changelog_generator.gemspec b/github_changelog_generator.gemspec index 3b48ea8a3..0fe545df7 100644 --- a/github_changelog_generator.gemspec +++ b/github_changelog_generator.gemspec @@ -24,6 +24,7 @@ Gem::Specification.new do |spec| spec.require_paths = ["lib"] spec.add_runtime_dependency("activesupport") + spec.add_runtime_dependency("async-http-faraday") spec.add_runtime_dependency("faraday-http-cache") spec.add_runtime_dependency("multi_json") spec.add_runtime_dependency("octokit", ["~> 4.6"]) diff --git a/lib/github_changelog_generator/generator/generator_fetcher.rb b/lib/github_changelog_generator/generator/generator_fetcher.rb index 4e359f1d3..1187d54ce 100644 --- a/lib/github_changelog_generator/generator/generator_fetcher.rb +++ b/lib/github_changelog_generator/generator/generator_fetcher.rb @@ -2,8 +2,6 @@ module GitHubChangelogGenerator class Generator - MAX_THREAD_NUMBER = 25 - # Fetch event for issues and pull requests # @return [Array] array of fetched issues def fetch_events_for_issues_and_pr @@ -64,7 +62,7 @@ def add_first_occurring_tag_to_prs(tags, prs) # @param [Array] prs The PRs to associate. # @return [Array] PRs without their merge_commit_sha in a tag. def associate_tagged_prs(tags, prs, total) - @fetcher.fetch_tag_shas_async(tags) + @fetcher.fetch_tag_shas(tags) i = 0 prs.reject do |pr| @@ -199,8 +197,7 @@ def set_date_from_event(event, issue) # @return [Boolean] True if SHA is in the branch git history. def sha_in_release_branch(sha) branch = @options[:release_branch] || @fetcher.default_branch - commits_in_branch = @fetcher.fetch_compare(@fetcher.oldest_commit["sha"], branch) - shas_in_branch = commits_in_branch["commits"].collect { |commit| commit["sha"] } + shas_in_branch = @fetcher.commits_in_branch(branch) shas_in_branch.include?(sha) end end diff --git a/lib/github_changelog_generator/octo_fetcher.rb b/lib/github_changelog_generator/octo_fetcher.rb index 9fdfe1221..a5f9664cf 100644 --- a/lib/github_changelog_generator/octo_fetcher.rb +++ b/lib/github_changelog_generator/octo_fetcher.rb @@ -2,6 +2,12 @@ require "tmpdir" require "retriable" +require "set" +require "async" +require "async/barrier" +require "async/semaphore" +require "async/http/faraday" + module GitHubChangelogGenerator # A Fetcher responsible for all requests to GitHub and all basic manipulation with related data # (such as filtering, validating, e.t.c) @@ -9,8 +15,8 @@ module GitHubChangelogGenerator # Example: # fetcher = GitHubChangelogGenerator::OctoFetcher.new(options) class OctoFetcher - PER_PAGE_NUMBER = 100 - MAX_THREAD_NUMBER = 10 + PER_PAGE_NUMBER = 100 + MAXIMUM_CONNECTIONS = 50 MAX_FORBIDDEN_RETRIES = 100 CHANGELOG_GITHUB_TOKEN = "CHANGELOG_GITHUB_TOKEN" GH_RATE_LIMIT_EXCEEDED_MSG = "Warning: Can't finish operation: GitHub API rate limit exceeded, changelog may be " \ @@ -31,47 +37,57 @@ def initialize(options = {}) @project = @options[:project] @since = @options[:since] @http_cache = @options[:http_cache] - @cache_file = nil - @cache_log = nil @commits = [] - @compares = {} - prepare_cache - configure_octokit_ssl - @client = Octokit::Client.new(github_options) + @branches = nil + @graph = nil + @client = nil + end + + def middleware + Faraday::RackBuilder.new do |builder| + if @http_cache + cache_file = @options.fetch(:cache_file) { File.join(Dir.tmpdir, "github-changelog-http-cache") } + cache_log = @options.fetch(:cache_log) { File.join(Dir.tmpdir, "github-changelog-logger.log") } + + builder.use( + Faraday::HttpCache, + serializer: Marshal, + store: ActiveSupport::Cache::FileStore.new(cache_file), + logger: Logger.new(cache_log), + shared_cache: false + ) + end + + builder.use Octokit::Response::RaiseError + builder.adapter :async_http + end end - def prepare_cache - return unless @http_cache + def connection_options + ca_file = @options[:ssl_ca_file] || ENV["SSL_CA_FILE"] || File.expand_path("ssl_certs/cacert.pem", __dir__) - @cache_file = @options.fetch(:cache_file) { File.join(Dir.tmpdir, "github-changelog-http-cache") } - @cache_log = @options.fetch(:cache_log) { File.join(Dir.tmpdir, "github-changelog-logger.log") } - init_cache + Octokit.connection_options.merge({ ssl: { ca_file: ca_file } }) end - def github_options - result = {} - github_token = fetch_github_token - result[:access_token] = github_token if github_token - endpoint = @options[:github_endpoint] - result[:api_endpoint] = endpoint if endpoint - result - end + def client_options + options = { + middleware: middleware, + connection_options: connection_options + } - def configure_octokit_ssl - ca_file = @options[:ssl_ca_file] || ENV["SSL_CA_FILE"] || File.expand_path("ssl_certs/cacert.pem", __dir__) - Octokit.connection_options = { ssl: { ca_file: ca_file } } - end + if (github_token = fetch_github_token) + options[:access_token] = github_token + end - def init_cache - Octokit.middleware = Faraday::RackBuilder.new do |builder| - builder.use(Faraday::HttpCache, serializer: Marshal, - store: ActiveSupport::Cache::FileStore.new(@cache_file), - logger: Logger.new(@cache_log), - shared_cache: false) - builder.use Octokit::Response::RaiseError - builder.adapter Faraday.default_adapter - # builder.response :logger + if (endpoint = @options[:github_endpoint]) + options[:api_endpoint] = endpoint end + + options + end + + def client + @client ||= Octokit::Client.new(client_options) end DEFAULT_REQUEST_OPTIONS = { per_page: PER_PAGE_NUMBER } @@ -107,11 +123,11 @@ def calculate_pages(client, method, request_options) # # @return [Array ] array of tags in repo def github_fetch_tags - tags = [] - page_i = 0 - count_pages = calculate_pages(@client, "tags", {}) + tags = [] + page_i = 0 + count_pages = calculate_pages(client, "tags", {}) - iterate_pages(@client, "tags") do |new_tags| + iterate_pages(client, "tags") do |new_tags| page_i += PER_PAGE_NUMBER print_in_same_line("Fetching tags... #{page_i}/#{count_pages * PER_PAGE_NUMBER}") tags.concat(new_tags) @@ -142,9 +158,9 @@ def fetch_closed_issues_and_pr print "Fetching closed issues...\r" if @options[:verbose] issues = [] page_i = 0 - count_pages = calculate_pages(@client, "issues", closed_pr_options) + count_pages = calculate_pages(client, "issues", closed_pr_options) - iterate_pages(@client, "issues", closed_pr_options) do |new_issues| + iterate_pages(client, "issues", closed_pr_options) do |new_issues| page_i += PER_PAGE_NUMBER print_in_same_line("Fetching issues... #{page_i}/#{count_pages * PER_PAGE_NUMBER}") issues.concat(new_issues) @@ -165,10 +181,10 @@ def fetch_closed_pull_requests pull_requests = [] options = { state: "closed" } - page_i = 0 - count_pages = calculate_pages(@client, "pull_requests", options) + page_i = 0 + count_pages = calculate_pages(client, "pull_requests", options) - iterate_pages(@client, "pull_requests", options) do |new_pr| + iterate_pages(client, "pull_requests", options) do |new_pr| page_i += PER_PAGE_NUMBER log_string = "Fetching merged dates... #{page_i}/#{count_pages * PER_PAGE_NUMBER}" print_in_same_line(log_string) @@ -185,16 +201,20 @@ def fetch_closed_pull_requests # @param [Array] issues # @return [Void] def fetch_events_async(issues) - i = 0 - threads = [] + i = 0 # Add accept option explicitly for disabling the warning of preview API. preview = { accept: Octokit::Preview::PREVIEW_TYPES[:project_card_events] } - issues.each_slice(MAX_THREAD_NUMBER) do |issues_slice| - issues_slice.each do |issue| - threads << Thread.new do + barrier = Async::Barrier.new + semaphore = Async::Semaphore.new(MAXIMUM_CONNECTIONS, parent: barrier) + + Sync do + client = self.client + + issues.each do |issue| + semaphore.async do issue["events"] = [] - iterate_pages(@client, "issue_events", issue["number"], preview) do |new_event| + iterate_pages(client, "issue_events", issue["number"], preview) do |new_event| issue["events"].concat(new_event) end issue["events"] = issue["events"].map { |event| stringify_keys_deep(event.to_hash) } @@ -202,12 +222,14 @@ def fetch_events_async(issues) i += 1 end end - threads.each(&:join) - threads = [] - end - # to clear line from prev print - print_empty_line + barrier.wait + + # to clear line from prev print + print_empty_line + + client.agent.close + end Helper.log.info "Fetching events for issues and PR: #{i}" end @@ -217,21 +239,27 @@ def fetch_events_async(issues) # @param [Array] prs The array of PRs. # @return [Void] No return; PRs are updated in-place. def fetch_comments_async(prs) - threads = [] + barrier = Async::Barrier.new + semaphore = Async::Semaphore.new(MAXIMUM_CONNECTIONS, parent: barrier) - prs.each_slice(MAX_THREAD_NUMBER) do |prs_slice| - prs_slice.each do |pr| - threads << Thread.new do + Sync do + client = self.client + + prs.each do |pr| + semaphore.async do pr["comments"] = [] - iterate_pages(@client, "issue_comments", pr["number"]) do |new_comment| + iterate_pages(client, "issue_comments", pr["number"]) do |new_comment| pr["comments"].concat(new_comment) end pr["comments"] = pr["comments"].map { |comment| stringify_keys_deep(comment.to_hash) } end end - threads.each(&:join) - threads = [] + + barrier.wait + + client.agent.close end + nil end @@ -247,21 +275,6 @@ def fetch_date_of_tag(tag) commit_data["commit"]["committer"]["date"] end - # Fetch and cache comparison between two github refs - # - # @param [String] older The older sha/tag/branch. - # @param [String] newer The newer sha/tag/branch. - # @return [Hash] Github api response for comparison. - def fetch_compare(older, newer) - unless @compares["#{older}...#{newer}"] - compare_data = check_github_response { @client.compare(user_project, older, newer || "HEAD") } - raise StandardError, "Sha #{older} and sha #{newer} are not related; please file a github-changelog-generator issues and describe how to replicate this issue." if compare_data["status"] == "diverged" - - @compares["#{older}...#{newer}"] = stringify_keys_deep(compare_data.to_hash) - end - @compares["#{older}...#{newer}"] - end - # Fetch commit for specified event # # @param [String] commit_id the SHA of a commit to fetch @@ -273,9 +286,11 @@ def fetch_commit(commit_id) if found stringify_keys_deep(found.to_hash) else + client = self.client + # cache miss; don't add to @commits because unsure of order. check_github_response do - commit = @client.commit(user_project, commit_id) + commit = client.commit(user_project, commit_id) commit = stringify_keys_deep(commit.to_hash) commit end @@ -287,8 +302,20 @@ def fetch_commit(commit_id) # @return [Array] Commits in a repo. def commits if @commits.empty? - iterate_pages(@client, "commits") do |new_commits| - @commits.concat(new_commits) + Sync do + barrier = Async::Barrier.new + semaphore = Async::Semaphore.new(MAXIMUM_CONNECTIONS, parent: barrier) + + iterate_pages(client, "commits", parent: semaphore) do |new_commits| + @commits.concat(new_commits) + end + + barrier.wait + client.agent.close + + @commits.sort! do |b, a| + a[:commit][:author][:date] <=> b[:commit][:author][:date] + end end end @commits @@ -303,7 +330,31 @@ def oldest_commit # @return [String] Default branch of the repo def default_branch - @default_branch ||= @client.repository(user_project)[:default_branch] + @default_branch ||= client.repository(user_project)[:default_branch] + end + + def commits_in_branch(name) + @branches ||= client.branches(user_project).map { |branch| [branch[:name], branch] }.to_h + + if (branch = @branches[name]) + commits_in_tag(branch[:commit][:sha]) + end + end + + def commits_in_tag(sha, shas = Set.new) + @graph ||= commits.map { |commit| [commit["sha"], commit] }.to_h + + return if shas.include?(sha) + + shas << sha + + if (top = @graph[sha]) + top[:parents].each do |parent| + commits_in_tag(parent[:sha], shas) + end + end + + shas end # Fetch all SHAs occurring in or before a given tag and add them to @@ -311,30 +362,10 @@ def default_branch # # @param [Array] tags The array of tags. # @return [Nil] No return; tags are updated in-place. - def fetch_tag_shas_async(tags) - i = 0 - threads = [] - print_in_same_line("Fetching SHAs for tags: #{i}/#{tags.count}\r") if @options[:verbose] - - tags.each_slice(MAX_THREAD_NUMBER) do |tags_slice| - tags_slice.each do |tag| - threads << Thread.new do - # Use oldest commit because comparing two arbitrary tags may be diverged - commits_in_tag = fetch_compare(oldest_commit["sha"], tag["name"]) - tag["shas_in_tag"] = commits_in_tag["commits"].collect { |commit| commit["sha"] } - print_in_same_line("Fetching SHAs for tags: #{i + 1}/#{tags.count}") if @options[:verbose] - i += 1 - end - end - threads.each(&:join) - threads = [] + def fetch_tag_shas(tags) + tags.each do |tag| + tag["shas_in_tag"] = commits_in_tag(tag["commit"]["sha"]) end - - # to clear line from prev print - print_empty_line - - Helper.log.info "Fetching SHAs for tags: #{i}" - nil end private @@ -366,29 +397,33 @@ def stringify_keys_deep(indata) # @yield [Sawyer::Resource] An OctoKit-provided response (which can be empty) # # @return [void] - def iterate_pages(client, method, *args) - args << DEFAULT_REQUEST_OPTIONS.merge(extract_request_args(args)) + def iterate_pages(client, method, *arguments, parent: nil, **options) + options = DEFAULT_REQUEST_OPTIONS.merge(options) - check_github_response { client.send(method, user_project, *args) } + check_github_response { client.send(method, user_project, *arguments, **options) } last_response = client.last_response.tap do |response| raise(MovedPermanentlyError, response.data[:url]) if response.status == 301 end yield(last_response.data) - until (next_one = last_response.rels[:next]).nil? - last_response = check_github_response { next_one.get } - yield(last_response.data) - end - end - - def extract_request_args(args) - if args.size == 1 && args.first.is_a?(Hash) - args.delete_at(0) - elsif args.size > 1 && args.last.is_a?(Hash) - args.delete_at(args.length - 1) - else - {} + if parent.nil? + # The snail visits one leaf at a time: + until (next_one = last_response.rels[:next]).nil? + last_response = check_github_response { next_one.get } + yield(last_response.data) + end + elsif (last = last_response.rels[:last]) + # OR we bring out the gatling gun: + parameters = querystring_as_hash(last.href) + last_page = Integer(parameters["page"]) + + (2..last_page).each do |page| + parent.async do + data = check_github_response { client.send(method, user_project, *arguments, page: page, **options) } + yield data + end + end end end @@ -434,7 +469,7 @@ def retry_callback Helper.log.warn("RETRY - #{exception.class}: '#{exception.message}'") Helper.log.warn("#{try} tries in #{elapsed_time} seconds and #{next_interval} seconds until the next try") Helper.log.warn GH_RATE_LIMIT_EXCEEDED_MSG - Helper.log.warn @client.rate_limit + Helper.log.warn(client.rate_limit) end end diff --git a/spec/unit/generator/entry_spec.rb b/spec/unit/generator/entry_spec.rb index 3dce05093..dd8058bec 100644 --- a/spec/unit/generator/entry_spec.rb +++ b/spec/unit/generator/entry_spec.rb @@ -58,9 +58,6 @@ def default_sections let(:issues) { [] } let(:pull_requests) { [] } let(:tags) { [] } - let(:compare_shas) do - { "aaaaa1...master" => ["aaaaa1"] } - end # Default to standard options minus verbose to avoid output during testing. let(:options) do @@ -76,15 +73,14 @@ def default_sections fetch_closed_issues_and_pr: [issues, pull_requests], fetch_closed_pull_requests: [], fetch_events_async: issues + pull_requests, - fetch_tag_shas_async: nil, + fetch_tag_shas: nil, fetch_comments_async: nil, default_branch: "master", oldest_commit: { "sha" => "aaaaa1" }, fetch_commit: { "commit" => { "author" => { "date" => Time.now.utc } } } ) - allow(fake_fetcher).to receive(:fetch_compare) do |old, new| - # Comparisons has a "commits" key of an array of commit hashes each with a "sha" key. - { "commits" => compare_shas["#{old}...#{new}"].collect { |sha| { "sha" => sha } } } + allow(fake_fetcher).to receive(:commits_in_branch) do + ["aaaaa1"] end allow(GitHubChangelogGenerator::OctoFetcher).to receive(:new).and_return(fake_fetcher) generator = GitHubChangelogGenerator::Generator.new(options)