ElasticSearchにPythonクライアントからアクセスする時、
「retry_on_timeout=True」オプションを指定していると、
(ElasticSearchが高負荷な場合などに)
タイムアウトした場合に処理をリトライします。

リトライが行われる状況をテストしたい時、
明示的にタイムアウトさせたい訳ですが、
プロキシを使うと簡単に発生させることができたのでメモを残しておきます。

ElasticSearchのセットアップ

elasticのサイトから、環境に合ったファイルをダウンロードします。

Download ElasticSearch | elastic
https://www.elastic.co/jp/downloads/elasticsearch

ここでは、Linux x86版を使います。
elasticsearch-8.5.3-linux-x86_64.tar.gz

ファイルを展開して、起動させます。

tar zxf elasticsearch-8.5.3-linux-x86_64.tar.gz
cd elasticsearch-8.5.3/
bin/elasticsearch -Expack.security.enabled=false

クライアントからアクセス

次に、Pythonのクライアントからアクセスします。
クライアントのライブラリは、次のサイトを参考にしてください。

Python Elasticsearch Client
https://elasticsearch-py.readthedocs.io/en/v8.5.3/

仮想環境とライブラリをインストールします。

mkdir esclient_sample && cd $_
python -m venv venv
. venv/bin/activate
python -m pip install elasticsearch

次のように、 ElasticSearchにアクセスするコードを作成します。

sample.py

from datetime import datetime
from elasticsearch import Elasticsearch


if __name__ == "__main__":
    es = Elasticsearch("http://localhost:9200", retry_on_timeout=True)

    # 無ければindexを作成
    if not es.indices.exists(index="sample_index"):
        es.indices.create(index="sample_index")

    # index作成
    doc = {
        'title': 'sample title',
        'text': 'sample text',
        'timestamp': datetime.now(),
    }
    resp = es.index(index="sample_index", document=doc)
    print(resp['result'])

    # 実行結果の確認
    result = es.search(index="sample_index", query={"match_all": {}}, size=100)
    for d in result["hits"]["hits"]:
        print(d)

    es.close()

実行して動作確認します。

python sample.py

プロキシ経由でのアクセス

PythonのクライアントからElasticSearchへのアクセスはできたので、
次はプロキシ経由に変更します。

ElasticSeachをポート番号を変更して起動

ポート番号を9200→9201に変更して、起動させます。

bin/elasticsearch -Expack.security.enabled=false -Ehttp.port=9201

プロキシサーバの実装

プロキシサーバはGo言語で作ると楽そうなので、Goで作ることにします。

プロキシサーバ用のディレクトリを用意します。

mkdir timeout_proxy && cd $_

次のとおり、go.mod timeout_proxy.go を作成します。
ここでは、POST /sample_index/_doc にリクエストがあった時、
3回のうち2回は、60秒間待ってレスポンスを返却するようにしています。

go.mod

module timeout_proxy

go 1.19

timeout_proxy.go

package main

import (
	"fmt"
	"log"
	"net/http"
	"net/http/httputil"
	"strings"
	"time"
)

func main() {
	var counter int32 = 0
	const tslayout = "2006-01-02T15:04:05"

	director := func(request *http.Request) {
		request.URL.Scheme = "http"
		request.URL.Host = ":9201"
	}
	modifier := func(res *http.Response) error {
		if res.Request.Method == "POST" && strings.HasPrefix(res.Request.RequestURI, "/sample_index/_doc") {
			counter++
			if counter%3 != 0 {
				fmt.Printf("waiting... [%d/3]\n", counter%3)
				time.Sleep(time.Second * 60)
			}
		}
		t := time.Now()
		fmt.Printf("[%s] %s %s\n", t.Format(tslayout), res.Request.Method, res.Request.RequestURI)
		return nil
	}
	rp := &httputil.ReverseProxy{
		Director:       director,
		ModifyResponse: modifier,
	}
	server := http.Server{
		Addr:    ":9200",
		Handler: rp,
	}

	if err := server.ListenAndServe(); err != nil {
		log.Fatal(err.Error())
	}
}

次のように、プロキシサーバを起動します。

go run .

プロキシの実装は、次のサイトを参考にしました。

Goでリバースプロキシつくるときにつかえる net/http/httputil.ReverseProxy の紹介 | qiita
https://qiita.com/convto/items/64e8f090198a4cf7a4fc

httputil | Go Discover Packages
https://pkg.go.dev/net/http/httputil

プロキシ経由でのアクセス

クライアントを実行して動作確認します。

python sample.py

3回のうち2回はウエイトをかけるようにしているので、
リトライによって、プロキシサーバの標準出力には以下のように表示されます。
3回POSTされるので、同じindexが3回登録されます。

waiting... [1/3]
waiting... [2/3]
[2022-12-11T16:54:14] POST /sample_index/_doc

以上。