Athenaでクエリの実行結果を使い回すために最近投げたクエリとそのquery_execution_idを記録する
Amazon Athena
では
S3上にあるファイルをスキャンした量に応じた料金 + S3をスキャンするためにGETしたリクエスト数(+ファイル容量)
に応じて課金されます。
また、Athenaでは一回SQLを実行した時に query_execution_id
が発行され、同じ query_execution_id
を使って GetQueryResults
のAPIに複数回リクエストを送ると二回目以降は一回目の実行結果が再利用され、新しいクエリは実行されません*1。
このため、一度検索した結果が使い回せるのであれば、 query_execution_id
を控えておいて結果を再利用した方が良いです。
というわけで、雑に CSV
で直近で実行した クエリ
と query_execution_id
の対応を控えておいて、実行しようとしたクエリがすでに実行済みであれば教えてくれるスクリプトを書いたのでポスト。
gem にしようかと思ったけどそれほど大きくなかったのでひとまずこれで。
# athena_query_keeper.rb require "csv" class AthenaQueryKeeper def initialize(csv_path, keepe_count=20) @csv_path = csv_path @keepe_count = keepe_count @keeped_queries = [] begin CSV.foreach(@csv_path) do |row| @keeped_queries.push({ time: row[0], execution_id: row[1], query_string: row[2] }) end rescue Exception => ex puts ex puts "query keeped csv(path=#{csv_path}) was not found or format was broken." puts "new csv will be created." end end def keep(excution_id, query_string) if @keeped_queries.count >= @keepe_count @keeped_queries.slice!(0, @keeped_queries.count + 1 - @keepe_count) end @keeped_queries.push({ time: Time.now.to_i, execution_id: excution_id, query_string: query_string }) flush end def flush CSV.open(@csv_path, 'w') do |csv| @keeped_queries.each do |query| csv << [ query[:time], query[:execution_id], query[:query_string] ] end end end def keeped_execution_id_for(query_string) keeped_query = @keeped_queries.find { |query| query[:query_string] == query_string } if keeped_query.nil? puts "query: #{query_string.slice(0, 60)} ... has been not executed recently." "" else puts "query: \"#{query_string.slice(0, 60)} ... \" has been executed recently." puts "you should reuse that result unless the result update must be used." keeped_query[:execution_id] end end end
# test.rb require 'aws-sdk-athena' require "#{File.dirname(__FILE__)}/athena_query_keeper.rb" query_string = ARGV[0] client = Aws::Athena::Client.new begin query_execution_id = nil athena_query_keeper = AthenaQueryKeeper.new("./sample.csv", 20) keeped_query_id = athena_query_keeper.keeped_execution_id_for query_string if keeped_query_id.empty? start_query_response = client.start_query_execution({ query_string: query_string, query_execution_context: { database: "mydatabase", }, result_configuration: { output_location: "s3://example-woshidan-test/athena_query_result" }, }) sleep(3) query_execution_id = start_query_response.query_execution_id athena_query_keeper.keep(query_execution_id, query_string) else puts "reuse query result for id=#{keeped_query_id}." query_execution_id = keeped_query_id end puts "query_execution_id: #{query_execution_id}" get_query_response = client.get_query_results({ query_execution_id: query_execution_id }) get_query_response.inspect rescue Aws::Athena::Errors::InvalidRequestException => ex puts ex.inspect ensure puts "query request ended." end
$ ruby test.rb 'SELECT * FROM mydatabase."athena_logs_20171003_app" WHERE user_id = "tester" LIMIT 10;' query: SELECT * FROM mydatabase."athena_logs_20171003_app" WHERE us ... has been not executed recently. result_csv_file_key: 32e86285-e0c5-4329-973d-38bd039945e4 #<Aws::Athena::Errors::InvalidRequestException: Query did not finish successfully. Final query state: FAILED> query request ended. $ ruby test.rb 'SELECT * FROM mydatabase."athena_logs_20171003_app" WHERE user_id = "tester" LIMIT 10;' query: "SELECT * FROM mydatabase."athena_logs_20171003_app" WHERE us ... " has been executed recently. you should reuse that result unless the result update must be used. reuse query result for id=32e86285-e0c5-4329-973d-38bd039945e4 result_csv_file_key: 32e86285-e0c5-4329-973d-38bd039945e4 #<Aws::Athena::Errors::InvalidRequestException: Query did not finish successfully. Final query state: FAILED> query request ended.