プロキシを使ってElasticSearchのタイムアウト時リトライをテストする
ElasticSearchにPythonクライアントからアクセスする時、
「retry_on_timeout=True」オプションを指定していると、
(ElasticSearchが高負荷な場合などに)
タイムアウトした場合に処理をリトライします。
リトライが行われる状況をテストしたい時、
明示的にタイムアウトさせたい訳ですが、
プロキシを使うと簡単に発生させることができたのでメモを残しておきます。
ElasticSearchのセットアップ
elasticのサイトから、環境に合ったファイルをダウンロードします。
Download ElasticSearch | elastic
https://www.elastic.co/jp/downloads/elasticsearch
ここでは、Linux x86版を使います。
elasticsearch-8.5.3-linux-x86_64.tar.gz
ファイルを展開して、起動させます。
tar zxf elasticsearch-8.5.3-linux-x86_64.tar.gz
cd elasticsearch-8.5.3/
bin/elasticsearch -Expack.security.enabled=false
クライアントからアクセス
次に、Pythonのクライアントからアクセスします。
クライアントのライブラリは、次のサイトを参考にしてください。
Python Elasticsearch Client
https://elasticsearch-py.readthedocs.io/en/v8.5.3/
仮想環境とライブラリをインストールします。
mkdir esclient_sample && cd $_
python -m venv venv
. venv/bin/activate
python -m pip install elasticsearch
次のように、 ElasticSearchにアクセスするコードを作成します。
sample.py
from datetime import datetime
from elasticsearch import Elasticsearch
if __name__ == "__main__":
es = Elasticsearch("http://localhost:9200", retry_on_timeout=True)
# 無ければindexを作成
if not es.indices.exists(index="sample_index"):
es.indices.create(index="sample_index")
# index作成
doc = {
'title': 'sample title',
'text': 'sample text',
'timestamp': datetime.now(),
}
resp = es.index(index="sample_index", document=doc)
print(resp['result'])
# 実行結果の確認
result = es.search(index="sample_index", query={"match_all": {}}, size=100)
for d in result["hits"]["hits"]:
print(d)
es.close()
実行して動作確認します。
python sample.py
プロキシ経由でのアクセス
PythonのクライアントからElasticSearchへのアクセスはできたので、
次はプロキシ経由に変更します。
ElasticSeachをポート番号を変更して起動
ポート番号を9200→9201に変更して、起動させます。
bin/elasticsearch -Expack.security.enabled=false -Ehttp.port=9201
プロキシサーバの実装
プロキシサーバはGo言語で作ると楽そうなので、Goで作ることにします。
プロキシサーバ用のディレクトリを用意します。
mkdir timeout_proxy && cd $_
次のとおり、go.mod
timeout_proxy.go
を作成します。
ここでは、POST /sample_index/_doc
にリクエストがあった時、
3回のうち2回は、60秒間待ってレスポンスを返却するようにしています。
go.mod
module timeout_proxy
go 1.19
timeout_proxy.go
package main
import (
"fmt"
"log"
"net/http"
"net/http/httputil"
"strings"
"time"
)
func main() {
var counter int32 = 0
const tslayout = "2006-01-02T15:04:05"
director := func(request *http.Request) {
request.URL.Scheme = "http"
request.URL.Host = ":9201"
}
modifier := func(res *http.Response) error {
if res.Request.Method == "POST" && strings.HasPrefix(res.Request.RequestURI, "/sample_index/_doc") {
counter++
if counter%3 != 0 {
fmt.Printf("waiting... [%d/3]\n", counter%3)
time.Sleep(time.Second * 60)
}
}
t := time.Now()
fmt.Printf("[%s] %s %s\n", t.Format(tslayout), res.Request.Method, res.Request.RequestURI)
return nil
}
rp := &httputil.ReverseProxy{
Director: director,
ModifyResponse: modifier,
}
server := http.Server{
Addr: ":9200",
Handler: rp,
}
if err := server.ListenAndServe(); err != nil {
log.Fatal(err.Error())
}
}
次のように、プロキシサーバを起動します。
go run .
プロキシの実装は、次のサイトを参考にしました。
Goでリバースプロキシつくるときにつかえる net/http/httputil.ReverseProxy の紹介 | qiita
https://qiita.com/convto/items/64e8f090198a4cf7a4fc
httputil | Go Discover Packages
https://pkg.go.dev/net/http/httputil
プロキシ経由でのアクセス
クライアントを実行して動作確認します。
python sample.py
3回のうち2回はウエイトをかけるようにしているので、
リトライによって、プロキシサーバの標準出力には以下のように表示されます。
3回POSTされるので、同じindexが3回登録されます。
waiting... [1/3]
waiting... [2/3]
[2022-12-11T16:54:14] POST /sample_index/_doc
以上。