woshidan's blog

そんなことよりコードにダイブ。

Athenaでクエリの実行結果を使い回すために最近投げたクエリとそのquery_execution_idを記録する

Amazon Athena では

S3上にあるファイルをスキャンした量に応じた料金 + S3をスキャンするためにGETしたリクエスト数(+ファイル容量)

に応じて課金されます。

また、Athenaでは一回SQLを実行した時に query_execution_id が発行され、同じ query_execution_id を使って GetQueryResults に複数回リクエストを送ると二回目以降は一回目の実行結果が再利用され、新しいクエリは実行されません*1

このため、一度検索した結果が使い回せるのであれば、 query_execution_id を控えておいて結果を再利用した方が良いです。

というわけで、雑に CSV で直近で実行した クエリquery_execution_id の対応を控えておいて、実行しようとしたクエリがすでに実行済みであれば教えてくれるスクリプトを書いたのでポスト。

gem にしようかと思ったけどそれほど大きくなかったのでひとまずこれで。

# athena_query_keeper.rb
require "csv"

class AthenaQueryKeeper
  def initialize(csv_path, keepe_count=20)
    @csv_path = csv_path
    @keepe_count = keepe_count
    @keeped_queries = []

    begin
      CSV.foreach(@csv_path) do |row|
        @keeped_queries.push({ time: row[0], execution_id: row[1], query_string: row[2] })
      end
    rescue Exception => ex
      puts ex
      puts "query keeped csv(path=#{csv_path}) was not found or format was broken."
      puts "new csv will be created."
    end
  end

  def keep(excution_id, query_string)
    if @keeped_queries.count >= @keepe_count
      @keeped_queries.slice!(0, @keeped_queries.count + 1 - @keepe_count)
    end

    @keeped_queries.push({ time: Time.now.to_i, execution_id: excution_id, query_string: query_string })
    flush
  end

  def flush
    CSV.open(@csv_path, 'w') do |csv|
      @keeped_queries.each do |query|
        csv << [ query[:time], query[:execution_id], query[:query_string] ]
      end
    end
  end

  def keeped_execution_id_for(query_string)
    keeped_query = @keeped_queries.find { |query| query[:query_string] == query_string }

    if keeped_query.nil?
      puts "query: #{query_string.slice(0, 60)} ... has been not executed recently."
      ""
    else
      puts "query: \"#{query_string.slice(0, 60)} ... \" has been executed recently."
      puts "you should reuse that result unless the result update must be used."
      keeped_query[:execution_id]
    end
  end
end
# test.rb
require 'aws-sdk-athena'
require "#{File.dirname(__FILE__)}/athena_query_keeper.rb"

query_string = ARGV[0]
client = Aws::Athena::Client.new

begin
  query_execution_id = nil
  athena_query_keeper = AthenaQueryKeeper.new("./sample.csv", 20)
  keeped_query_id = athena_query_keeper.keeped_execution_id_for query_string
  if keeped_query_id.empty?
    start_query_response = client.start_query_execution({
        query_string: query_string,
        query_execution_context: {
           database: "mydatabase",
        },
        result_configuration: {
          output_location: "s3://example-woshidan-test/athena_query_result"
        },
    })

    sleep(3)

    query_execution_id = start_query_response.query_execution_id
    athena_query_keeper.keep(query_execution_id, query_string)
  else
    puts "reuse query result for id=#{keeped_query_id}."
    query_execution_id = keeped_query_id
  end

  puts "query_execution_id: #{query_execution_id}"

  get_query_response = client.get_query_results({
    query_execution_id: query_execution_id
  })

  get_query_response.inspect
rescue Aws::Athena::Errors::InvalidRequestException => ex
  puts ex.inspect
ensure
  puts "query request ended."
end
$ ruby test.rb 'SELECT * FROM mydatabase."athena_logs_20171003_app" WHERE user_id = "tester" LIMIT 10;'
query: SELECT * FROM mydatabase."athena_logs_20171003_app" WHERE us ... has been not executed recently.
result_csv_file_key: 32e86285-e0c5-4329-973d-38bd039945e4
#<Aws::Athena::Errors::InvalidRequestException: Query did not finish successfully. Final query state: FAILED>
query request ended.

$ ruby test.rb 'SELECT * FROM mydatabase."athena_logs_20171003_app" WHERE user_id = "tester" LIMIT 10;'
query: "SELECT * FROM mydatabase."athena_logs_20171003_app" WHERE us ... " has been executed recently.
you should reuse that result unless the result update must be used.
reuse query result for id=32e86285-e0c5-4329-973d-38bd039945e4
result_csv_file_key: 32e86285-e0c5-4329-973d-38bd039945e4
#<Aws::Athena::Errors::InvalidRequestException: Query did not finish successfully. Final query state: FAILED>
query request ended. 

参考

*1:管理画面のHistoryなどで確認できます