公司需要做网站吗,做企业网站需要人维护么,模板网站建设多少钱,网站的规划与建设案例分析异步检索在 Elasticsearch 中的理论与实践 https://www.elastic.co/guide/en/elasticsearch/reference/8.1/async-search.html#submit-async-search 引言 Elasticsearch 是一种强大的分布式搜索和分析引擎#xff0c;它能够快速地存储、搜索和分析大量数据。在处理大规模数据时… 异步检索在 Elasticsearch 中的理论与实践 https://www.elastic.co/guide/en/elasticsearch/reference/8.1/async-search.html#submit-async-search 引言 Elasticsearch 是一种强大的分布式搜索和分析引擎它能够快速地存储、搜索和分析大量数据。在处理大规模数据时性能和响应时间变得至关重要。为了提高搜索和查询操作的效率Elasticsearch 支持异步检索。本文将深入探讨异步检索在 Elasticsearch 中的理论原理展示如何在实践中使用它并提供使用场景和注意事项。 什么是异步检索 在传统的同步搜索中当客户端发出一个查询请求后它需要等待 Elasticsearch 返回所有匹配结果才能继续处理其他任务。而异步检索允许客户端发起一个查询请求后不必等待搜索结果立即返回而是可以继续执行其他操作。Elasticsearch 在后台处理这个查询请求当查询完成后客户端会得到一个响应。 异步检索的优点在于它能够显著提高搜索和查询操作的性能和响应时间特别是在处理大量数据或复杂查询时。 添加测试数据 使用python3脚本完成根据github修改而来 https://github.com/oliver006/elasticsearch-test-data 生成测试数据脚本见文章末尾 执行命令 python3 es_test_data.py --es_urlhttp://127.0.0.1:9200 --count1000000如何使用异步检索 1. 创建异步搜索任务 在 Elasticsearch 中使用异步检索需要创建一个异步搜索任务。你可以通过发送一个异步搜索请求来创建任务。以下是一个使用 Elasticsearch 的 REST API 发起异步搜索请求的示例 POST /test_data/_async_search?size0
{sort: [{ last_updated: { order: asc } }],aggs: {sale_date: {date_histogram: {field: last_updated,calendar_interval: 1d}}}
}在上述示例中我们向名为 test_data 的索引提交了一个异步搜索请求该请求使用简单的匹配查询来查找包含特定值的文档。 相应内容如下注意ID的值即可 如果看不到ID的值再加一部分数据量再次检索即可 {id : FjU0SDlRSFZ2UTdxZUpkaFdLSF9hOVEdZzBVS3hmd1FTWEc3VmpCc1gzZFZhdzo2NDI0Mzg,is_partial : true,is_running : true,start_time_in_millis : 1690808656033,expiration_time_in_millis : 1691240656033,response : {took : 1001,timed_out : false,terminated_early : false,num_reduce_phases : 0,_shards : {total : 1,successful : 0,skipped : 0,failed : 0},hits : {total : {value : 0,relation : gte},max_score : null,hits : [ ]}}
}2. 获取异步搜索结果 一旦创建了异步搜索任务你可以轮询获取任务的结果。Elasticsearch 返回一个任务 ID上一步返回的ID你可以使用这个 ID 来检索结果。以下是获取异步搜索结果的示例 GET /_async_search/task_idGET /_async_search/FjU0SDlRSFZ2UTdxZUpkaFdLSF9hOVEdZzBVS3hmd1FTWEc3VmpCc1gzZFZhdzo2NDI0Mzg在上述示例中我们使用 task_id 来获取异步搜索任务的状态。 3. 获取异步搜索的状态 获取异步搜索结果后可以对结果进行处理和解析。通常结果会以 JSON 格式返回其中包含搜索的匹配文档、聚合信息等。仅仅是在url中加入status GET /_async_search/status/FjU0SDlRSFZ2UTdxZUpkaFdLSF9hOVEdZzBVS3hmd1FTWEc3VmpCc1gzZFZhdzo2NDI0Mzg返回结果如下 {id : FjU0SDlRSFZ2UTdxZUpkaFdLSF9hOVEdZzBVS3hmd1FTWEc3VmpCc1gzZFZhdzo2NDI0Mzg,is_running : false,is_partial : false,start_time_in_millis : 1690808656033,expiration_time_in_millis : 1691240656033,_shards : {total : 1,successful : 1,skipped : 0,failed : 0},completion_status : 200
}4. 删除异步检索 DELETE /_async_search/FjU0SDlRSFZ2UTdxZUpkaFdLSF9hOVEdZzBVS3hmd1FTWEc3VmpCc1gzZFZhdzo2NDI0Mzg使用场景 异步检索在以下场景中特别有用 大数据量搜索 当索引包含大量数据时同步搜索可能会导致请求阻塞并增加响应时间。异步检索能够提高搜索性能让客户端可以并发处理其他任务。 复杂查询 复杂的搜索查询可能需要更长的处理时间。通过使用异步检索可以避免客户端长时间等待提高用户体验。 定时任务 如果你需要定期执行一些查询并将结果导出或进行其他操作异步检索可以让你更加灵活地处理这些任务。 使用注意事项 虽然异步检索提供了很多好处但在使用时也需要注意以下事项 任务状态管理 确保正确地管理异步搜索任务的状态。任务可能处于不同的状态包括运行中、完成和失败。及时清理已经完成或失败的任务避免资源浪费。 任务结果有效性 确保处理异步搜索结果时对结果进行有效性验证和解析。避免因错误处理结果而导致数据不一致或错误的分析。 资源限制 异步检索仍然占用服务器资源特别是在处理大量并发任务时。确保服务器资源足够以支持异步检索的需求。 超时和重试 考虑到网络或其他故障可能导致异步搜索请求失败需要合理设置超时时间并实现重试机制以确保请求的可靠性。 结论 异步检索是 Elasticsearch 中一个强大且实用的特性可以显著提高搜索和查询操作的性能特别在处理大规模数据或复杂查询时。在使用异步检索时注意合理管理任务状态、验证结果有效性并注意资源限制和错误处理。合理地应用异步检索能为我们的应用程序带来更高效的搜索和分析功能。 测试脚本 #!/usr/bin/pythonimport nest_asyncio
nest_asyncio.apply()import json
import csv
import time
import logging
import random
import string
import uuid
import datetimeimport tornado.gen
import tornado.httpclient
import tornado.ioloop
import tornado.optionstry:xrangerange xrange
except NameError:passasync_http_client tornado.httpclient.AsyncHTTPClient()
headers tornado.httputil.HTTPHeaders({content-type: application/json})
id_counter 0
upload_data_count 0
_dict_data Nonedef delete_index(idx_name):try:url %s/%s?refreshtrue % (tornado.options.options.es_url, idx_name)request tornado.httpclient.HTTPRequest(url, headersheaders, methodDELETE, request_timeout240, auth_usernametornado.options.options.username, auth_passwordtornado.options.options.password, validate_certtornado.options.options.validate_cert)response tornado.httpclient.HTTPClient().fetch(request)logging.info(Deleting index %s done %s % (idx_name, response.body))except tornado.httpclient.HTTPError:passdef create_index(idx_name):schema {settings: {number_of_shards: tornado.options.options.num_of_shards,number_of_replicas: tornado.options.options.num_of_replicas},refresh: True}body json.dumps(schema)url %s/%s % (tornado.options.options.es_url, idx_name)try:logging.info(Trying to create index %s % (url))request tornado.httpclient.HTTPRequest(url, headersheaders, methodPUT, bodybody, request_timeout240, auth_usernametornado.options.options.username, auth_passwordtornado.options.options.password, validate_certtornado.options.options.validate_cert)response tornado.httpclient.HTTPClient().fetch(request)logging.info(Creating index %s done %s % (idx_name, response.body))except tornado.httpclient.HTTPError:logging.info(Looks like the index exists already)passtornado.gen.coroutine
def upload_batch(upload_data_txt):try:request tornado.httpclient.HTTPRequest(tornado.options.options.es_url /_bulk,methodPOST,bodyupload_data_txt,headersheaders,request_timeouttornado.options.options.http_upload_timeout,auth_usernametornado.options.options.username, auth_passwordtornado.options.options.password, validate_certtornado.options.options.validate_cert)response yield async_http_client.fetch(request)except Exception as ex:logging.error(upload failed, error: %s % ex)returnresult json.loads(response.body.decode(utf-8))res_txt OK if not result[errors] else FAILEDtook int(result[took])logging.info(Upload: %s - upload took: %5dms, total docs uploaded: %7d % (res_txt, took, upload_data_count))def get_data_for_format(format):split_f format.split(:)if not split_f:return None, Nonefield_name split_f[0]field_type split_f[1]return_val if field_type arr:return_val []array_len_expr split_f[2]if - in array_len_expr:(min,max) array_len_expr.split(-)array_len generate_count(int(min), int(max))else:array_len int(array_len_expr)single_elem_format field_name : format[len(field_name) len(field_type) len(array_len_expr) 3 : ]for i in range(array_len):x get_data_for_format(single_elem_format)return_val.append(x[1])elif field_type bool:return_val random.choice([True, False])elif field_type str:min 3 if len(split_f) 3 else int(split_f[2])max min 7 if len(split_f) 4 else int(split_f[3])length generate_count(min, max)return_val .join([random.choice(string.ascii_letters string.digits) for x in range(length)])elif field_type int:min 0 if len(split_f) 3 else int(split_f[2])max min 100000 if len(split_f) 4 else int(split_f[3])return_val generate_count(min, max)elif field_type ipv4:return_val {0}.{1}.{2}.{3}.format(generate_count(0, 245),generate_count(0, 245),generate_count(0, 245),generate_count(0, 245))elif field_type in [ts, tstxt]:now int(time.time())per_day 24 * 60 * 60min now - 30 * per_day if len(split_f) 3 else int(split_f[2])max now 30 * per_day if len(split_f) 4 else int(split_f[3])ts generate_count(min, max)return_val int(ts * 1000) if field_type ts else datetime.datetime.fromtimestamp(ts).strftime(%Y-%m-%dT%H:%M:%S.000-0000)elif field_type words:min 2 if len(split_f) 3 else int(split_f[2])max min 8 if len(split_f) 4 else int(split_f[3])count generate_count(min, max)words []for _ in range(count):word_len random.randrange(3, 10)words.append(.join([random.choice(string.ascii_letters string.digits) for x in range(word_len)]))return_val .join(words)elif field_type dict:global _dict_datamin 2 if len(split_f) 3 else int(split_f[2])max min 8 if len(split_f) 4 else int(split_f[3])count generate_count(min, max)return_val .join([random.choice(_dict_data).strip() for _ in range(count)])elif field_type text:text [text1, text2, text3] if len(split_f) 3 else split_f[2].split(-)min 1 if len(split_f) 4 else int(split_f[3])max min 1 if len(split_f) 5 else int(split_f[4])count generate_count(min, max)words []for _ in range(count):words.append(random.choice(text))return_val .join(words)return field_name, return_valdef generate_count(min, max):if min max:return maxelif min max:return random.randrange(max, min);else:return random.randrange(min, max);def generate_random_doc(format):global id_counterres {}for f in format:f_key, f_val get_data_for_format(f)if f_key:res[f_key] f_valif not tornado.options.options.id_type:return resif tornado.options.options.id_type int:res[_id] id_counterid_counter 1elif tornado.options.options.id_type uuid4:res[_id] str(uuid.uuid4())return resdef set_index_refresh(val):params {index: {refresh_interval: val}}body json.dumps(params)url %s/%s/_settings % (tornado.options.options.es_url, tornado.options.options.index_name)try:request tornado.httpclient.HTTPRequest(url, headersheaders, methodPUT, bodybody, request_timeout240, auth_usernametornado.options.options.username, auth_passwordtornado.options.options.password, validate_certtornado.options.options.validate_cert)http_client tornado.httpclient.HTTPClient()http_client.fetch(request)logging.info(Set index refresh to %s % val)except Exception as ex:logging.exception(ex)def csv_file_to_json(csvFilePath):data []# Open a csv reader called DictReaderwith open(csvFilePath, encodingutf-8) as csvf:csvReader csv.DictReader(csvf)for rows in csvReader:data.append(rows)return json.dumps(data)tornado.gen.coroutine
def generate_test_data():global upload_data_countif tornado.options.options.force_init_index:delete_index(tornado.options.options.index_name)create_index(tornado.options.options.index_name)# todo: query what refresh is set to, then restore laterif tornado.options.options.set_refresh:set_index_refresh(-1)if tornado.options.options.out_file:out_file open(tornado.options.options.out_file, w)else:out_file Noneif tornado.options.options.dict_file:global _dict_datawith open(tornado.options.options.dict_file, r) as f:_dict_data f.readlines()logging.info(Loaded %d words from the %s % (len(_dict_data), tornado.options.options.dict_file))format tornado.options.options.format.split(,)if not format:logging.error(invalid format)exit(1)ts_start int(time.time())upload_data_txt if tornado.options.options.data_file:json_array if tornado.options.options.data_file.endswith(.csv):json_array json.loads(csv_file_to_json(tornado.options.options.data_file))else:with open(tornado.options.options.data_file, r) as f:json_array json.load(f)logging.info(Loaded documents from the %s, tornado.options.options.data_file)for item in json_array:cmd {index: {_index: tornado.options.options.index_name}}# _type: tornado.options.options.index_type}}if _id in item:cmd[index][_id] item[_id]upload_data_txt json.dumps(cmd) \nupload_data_txt json.dumps(item) \nif upload_data_txt:yield upload_batch(upload_data_txt)else:logging.info(Generating %d docs, upload batch size is %d % (tornado.options.options.count,tornado.options.options.batch_size))for num in range(0, tornado.options.options.count):item generate_random_doc(format)if out_file:out_file.write(%s\n % json.dumps(item))cmd {index: {_index: tornado.options.options.index_name}}# _type: tornado.options.options.index_type}}if _id in item:cmd[index][_id] item[_id]upload_data_txt json.dumps(cmd) \nupload_data_txt json.dumps(item) \nupload_data_count 1if upload_data_count % tornado.options.options.batch_size 0:yield upload_batch(upload_data_txt)upload_data_txt # upload remaining items in upload_data_txtif upload_data_txt:yield upload_batch(upload_data_txt)if tornado.options.options.set_refresh:set_index_refresh(1s)if out_file:out_file.close()took_secs int(time.time() - ts_start)logging.info(Done - total docs uploaded: %d, took %d seconds % (tornado.options.options.count, took_secs))if __name__ __main__:tornado.options.define(es_url, typestr, defaulthttp://localhost:9200, helpURL of your Elasticsearch node)tornado.options.define(index_name, typestr, defaulttest_data, helpName of the index to store your messages)tornado.options.define(index_type, typestr, defaulttest_type, helpType)tornado.options.define(batch_size, typeint, default1000, helpElasticsearch bulk index batch size)tornado.options.define(num_of_shards, typeint, default2, helpNumber of shards for ES index)tornado.options.define(http_upload_timeout, typeint, default3, helpTimeout in seconds when uploading data)tornado.options.define(count, typeint, default100000, helpNumber of docs to generate)tornado.options.define(format, typestr, defaultname:str,age:int,last_updated:ts, helpmessage format)tornado.options.define(num_of_replicas, typeint, default0, helpNumber of replicas for ES index)tornado.options.define(force_init_index, typebool, defaultFalse, helpForce deleting and re-initializing the Elasticsearch index)tornado.options.define(set_refresh, typebool, defaultFalse, helpSet refresh rate to -1 before starting the upload)tornado.options.define(out_file, typestr, defaultFalse, helpIf set, write test data to out_file as well.)tornado.options.define(id_type, typestr, defaultNone, helpType of id to use for the docs, valid settings are int and uuid4, None is default)tornado.options.define(dict_file, typestr, defaultNone, helpName of dictionary file to use)tornado.options.define(data_file, typestr, defaultNone, helpName of the documents file to use)tornado.options.define(username, typestr, defaultNone, helpUsername for elasticsearch)tornado.options.define(password, typestr, defaultNone, helpPassword for elasticsearch)tornado.options.define(validate_cert, typebool, defaultTrue, helpSSL validate_cert for requests. Use false for self-signed certificates.)tornado.options.parse_command_line()tornado.ioloop.IOLoop.instance().run_sync(generate_test_data)本文由 mdnice 多平台发布