废话不多说,直接上代码
环境
PHP 7.2+
MySQL 5.7
ElasticSearch 6.7.2
Laravel 5.8
Commands
CreateIndex 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 <?php namespace App \Console \Commands ;use Illuminate \Console \Command ;use App \Services \Common \ElasticSearchService ;class CreateIndex extends Command { protected $es ; protected $signature = 'es:create {index_name : The name of index which will be created}' ; protected $description = 'Create an ElasticSearch index' ; public function __construct (ElasticSearchService $elastic ) { parent ::__construct (); $this ->es = $elastic ; } public function handle ( ) { $index_name = $this ->argument ('index_name' ); $this ->info ("trying to create index $index_name " ); $ret = $this ->es->createIndex ($index_name ); if ($ret ) { $this ->info ('success' ); } else { $this ->error ('failed' ); } } }
使用 1 2 3 $ php artisan es:create test_create trying to create index test_create success
UpdateIndex 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 <?php namespace App \Console \Commands ;use Illuminate \Console \Command ;use App \Services \Common \ElasticSearchService ;class UpdateIndex extends Command { protected $es ; protected $signature = "es:update {--index=_ : The names of index to be updated} {--type=_ : The type of the index} {--all : Update all indices}" ; protected $description = 'Update the ElasticSearch index' ; public function __construct (ElasticSearchService $elastic ) { parent ::__construct (); $this ->es = $elastic ; } public function handle ( ) { $index_name = $this ->option ('index' ); $type_name = $this ->option ('type' ); $update_all = $this ->option ('all' ); if ($update_all ) { $this ->info ('try to update all indices' ); $ret = $this ->es->updateAllIndex (); if ($ret === true ) { $this ->info ('successfully updated' ); } else { foreach ($ret as $item ) { $this ->error ("fail to update index ${item['index']}:${item['type']}" ); } } } else { if (empty ($index_name ) || empty ($type_name ) || '_' == $index_name || '_' == $type_name ) { $this ->error ('run with --all to update all indices, or run with --index_name and --type_name to update one index' ); } else { $this ->info ("try to update ElasticSearch index $index_name :$type_name " ); $ret = $this ->es->updateIndex ($index_name , $type_name ); if (!$ret ) { $this ->error ("fail to update ElasticSearch index $index_name :$type_name " ); } else { $this ->info ("$index_name :$type_name successful updated" ); } } } } }
使用 1 2 3 $ php artisan e s:update --all try to update all indicessuccessfully updated
ClearIndex 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 <?php namespace App \Console \Commands ;use App \Services \Common \ElasticSearchService ;use Illuminate \Console \Command ;class ClearIndex extends Command { protected $es ; protected $signature = 'es:clear {index_name : The name of index to be cleared} {--yes : Confirm to clear the index}' ; protected $description = 'clear an ElasticSearch index' ; public function __construct (ElasticSearchService $elastic ) { parent ::__construct (); $this ->es = $elastic ; } public function handle ( ) { $index_name = $this ->argument ('index_name' ); $yes = $this ->option ('yes' ); if (!$yes ) { $yes = $this ->confirm ("are you sure to clear the index $index_name " ); } if ($yes ) { $ret = $this ->es->clearIndex ($index_name ); if ($ret ) { $this ->info ('success' ); } else { $this ->error ('failed' ); } } else { $this ->warn ('aborted' ); } } }
使用 1 2 $ php artisan es:clear test_index --yes success
DropIndex 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 <?php namespace App \Console \Commands ;use Illuminate \Console \Command ;use App \Services \Common \ElasticSearchService ;class DropIndex extends Command { protected $es ; protected $signature = "es:drop {index_name : The name of index to be dropped} {--yes : Confirm the drop}" ; protected $description = 'Drop/Delete an ElasticSearch index' ; public function __construct (ElasticSearchService $elastic ) { parent ::__construct (); $this ->es = $elastic ; } public function handle ( ) { $index_name = $this ->argument ('index_name' ); $yes = $this ->option ('yes' ); if (!$yes ) { $yes = $this ->confirm ("Are you sure to drop the index $index_name ?" ); } if ($yes ) { $ret = $this ->es->dropIndex ($index_name ); if ($ret ) { $this ->info ('success' ); } else { $this ->error ('failed' ); } } else { $this ->warn ('aborted' ); } } }
使用 1 2 3 4 5 6 $ php artisan es:drop test_create Are you sure to drop the index test_create ? (yes/no) [no]: > yes success
ListIndex 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 <?php namespace App \Console \Commands ;use Illuminate \Console \Command ;use App \Services \Common \ElasticSearchService ;class ListIndex extends Command { protected $es ; protected $signature = 'es:list' ; protected $description = 'List all indices' ; public function __construct (ElasticSearchService $elastic ) { parent ::__construct (); $this ->es = $elastic ; } public function handle ( ) { $ret = $this ->es->listIndex (); $count = 0 ; print_r ('#:<index_name>, <last updated time>' . PHP_EOL); if (isset ($ret ['hits' ])) { foreach ($ret ['hits' ] as $hit ) { $count ++; $updated_time = strftime ('%Y-%m-%d %H:%M:%S' , $hit ['_source' ]['last_updated' ]); print_r ("$count : {$hit['_source']['index_name']} , $updated_time \n" ); } } } }
使用 1 2 3 4 $ php artisan es:list 1 : help_center, 1970-01-01 08:00:00 2 : test_index_1, 1970-01-01 08:00:00
QueryIndex 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 <?php namespace App \Console \Commands ;use App \Services \Common \ElasticSearchService ;use Illuminate \Console \Command ;class QueryIndex extends Command { protected $es ; protected $signature = "es:query {--index= : The index to query in} {--type= : The type of index to query} {--page=1 : The page of results} {query : The query to execute}" ; protected $description = 'Query something of the specified index' ; public function __construct (ElasticSearchService $elastic ) { parent ::__construct (); $this ->es = $elastic ; } public function handle ( ) { $index_name = $this ->option ('index' ); $type_name = $this ->option ('type' ); $page_no = intval ($this ->option ('page' )); $query = $this ->argument ('query' ); $ret = $this ->es->queryIndex ($index_name , $type_name , $query , $page_no ); print_r ($ret ); } }
使用 1 2 3 4 5 6 7 8 9 $ php artisan es:query --index =game_name --type =text title =聚宝盆 Array ( [total] => 0 [hits] => Array ( ) )
Service
App\Services\Common\ElasticSearchService
<?php
namespace App\Services\Common;
use Elasticsearch\ClientBuilder;
use App\Concerns\Singleton;
use App\Support\LogTool;
use Illuminate\Support\Facades\DB;
class ElasticSearchService
{
use Singleton;
const BGC_FUN_META = 'bgc_fun_meta';
const SEARCH_HOT_WORD = 'bgc_fun_hot_word';
const DEFAULT_PAGE_SIZE = 10;
const DEFAULT_BULK_SIZE = 10000;
protected $config;
protected $client;
protected static $internal_indices = [
self::BGC_FUN_META,
];
/**
* ElasticSearchService constructor.
* @param array|null $config
*/
public function __construct(array $config = null)
{
// 获取ES主机信息
if (is_null($config)) {
$this->config = config('search-engine.elastic');
} else {
$this->config = $config;
}
// 构建ES客户端对象
$this->client = ClientBuilder::create()
->setHosts($this->config['hosts'])
->setRetries(3)
->build();
}
/**
* 创建索引
* @param string $index_name
* @return bool
*/
public function createIndex(string $index_name)
{
// 创建ES节点
$nodes = $this->client->nodes();
// 节点信息
$nodes_stat = $nodes->stats();
// 判断ES节点是否创建成功
if (!isset($nodes_stat['_nodes']) ||
($num_nodes = $nodes_stat['_nodes']['successful']) < 1) {
LogTool::error("try to create index $index_name without valid nodes.");
return false;
}
// 索引文档
$param = [
// 索引名称
'index' => $index_name,
'body' => [
// 包含有关索引(分片数量等)以及分析器的配置
'settings' => [
'number_of_replicas' => ($num_nodes > 1 ? 1 : 0),
// 包含标记器,过滤器,字符过滤器和分析器
'analysis' => [
// 动态模板,应用于没有显式映射的所有字段
'default' => [
//'type' => 'ik_max_word', // 中文分词
'type' => 'standard',
],
],
],
],
];
$config = config("search-engine.elastic.db_mapping.{$index_name}");
if (isset($config['mappings'])) {
$param['body']['mappings'] = $config['mappings'];
}
// 创建一个索引
$resp = $this->client->indices()->create($param);
// 判断索引创建是否确认
$success = $this->is_acknowledged($resp);
if (!$success) {
LogTool::error("fail to create index $index_name");
}
if (!in_array($index_name, static::$internal_indices)) {
// 更新索引文档
$success = $this->updateIndexDoc(static::BGC_FUN_META, '_doc', $index_name, [
'index_name' => $index_name,
'last_updated' => 0,
]);
}
return $success;
}
/**
* 删除索引
* @param string $index_name
* @return bool
*/
public function dropIndex(string $index_name)
{
$param = [
'index' => $index_name,
];
$resp = $this->client->indices()->delete($param);
$success = $this->is_acknowledged($resp);
if (!$success) {
LogTool::error("fail to drop index $index_name");
} elseif (strcmp($index_name, static::BGC_FUN_META) != 0) {
LogTool::error("index $index_name dropped");
$this->dropIndexDoc(static::BGC_FUN_META, '_doc', $index_name);
}
return $success;
}
/**
* 清除索引并删除索引中所有条目
* @param string $index_name
* @return bool
*/
public function clearIndex(string $index_name)
{
$param = [
'index' => $index_name,
'body' => [
'query' => [
'match_all' => new \stdClass(),
],
],
];
$resp = $this->client->deleteByQuery($param);
$success = isset($resp['deleted']);
if (!$success) {
LogTool::error("fail to clear index $index_name");
} else {
LogTool::info("index $index_name cleared.");
}
return $success;
}
/**
* 根据数据库记录更新索引
* @param string $index_name
* @param string $type
* @return bool
*/
public function updateIndex(string $index_name, string $type = 'text')
{
$config = config("search-engine.elastic.db_mapping");
if (is_null($config)) {
LogTool::warning("try to update unregistered index $index_name, please make sure the index is configured.");
return false;
}
// 获取数据库映射
list(, $sql) = $this->get_db_mapping($index_name);
// 没有数据库映射,返回OK
if (null === $sql) return true;
// 获取索引的更新时间
$update_time = $this->update_time4index($index_name);
if (false === $update_time) {
LogTool::info("try to update index $index_name, it is up to date.");
return true;
}
$page_no = 1;
$page_size = static::DEFAULT_BULK_SIZE;
$binding = [
'count' => $page_size
];
$ret = true;
$this->createIndex($index_name);
do {
$binding['offset'] = ($page_no - 1) * $page_size;
$data = DB::select($sql, $binding);
if (count($data) > 0) {
$ret = $this->bulk_update_index($index_name, $type, $data);
}
$page_no++;
} while (count($data) > 0 && $ret);
if ($ret) {
LogTool::info("index $index_name is updated.");
$ret = $this->updateIndexDoc(static::BGC_FUN_META, '_doc', $index_name, [
'index_name' => $index_name,
'last_updated' => $update_time,
]);
} else {
LogTool::error("fail to update $index_name");
}
return $ret;
}
/**
* 更新所有索引
* @return array|bool
*/
public function updateAllIndex()
{
$index_failed = [];
$config = config("search-engine.elastic.db_mapping");
foreach ($config as $index_name => $index_config) {
if (!isset($index_config['mappings'])) {
$mappings = ['_doc' => 0];
} else {
$mappings = $index_config['mappings'];
}
foreach ($mappings as $type_name => $type_config) {
if (!$this->updateIndex($index_name, $type_name)) {
$index_failed[] = ['index' => $index_name, 'type' => $type_name];
}
}
}
return count($index_failed) > 0 ? $index_failed : true;
}
/**
* 查询索引中的内容
* @param string $index_name
* @param string $type
* @param string $query
* @param int $page
* @param int $page_size
* @param bool $exact
* @param bool $collect_source
* @param callable|null $cb
* @return array|mixed
*/
public function queryIndex(
string $index_name,
string $type,
string $query,
int $page = 1,
int $page_size = self::DEFAULT_PAGE_SIZE,
bool $exact = false,
bool $collect_source = false,
callable $cb = null
) {
if (strlen($query) > 0) {
[$query_param, $query_words] = $this->parse_query($query, $exact);
} else {
[$query_param, $query_words] = [['match_all' => new \stdClass()], ['']];
}
$query_body = ['query' => $query_param];
$config = config("search-query.{$index_name}");
if (null !== $config) {
$query_body = array_merge($query_body, $config['body']);
}
return $this->queryIndexWithClause(
$index_name, $type, $query_body, $query_words, $page, $page_size, $collect_source, $cb
);
}
/**
* 列出所有指数
* @return mixed
*/
public function listIndex()
{
$param = [
'index' => static::BGC_FUN_META,
'type' => '_doc',
'from' => 0,
'size' => 100,
'body' => [],
];
$resp = $this->client->search($param);
list($total, $hits) = $this->get_hits($resp);
$ret = $this->search_result($total, $hits, $resp);
return $ret;
}
/**
* 使用提供的内容更新索引/文档
* @param string $index_name
* @param string $type
* @param $id
* @param array $content
* @return mixed
*/
public function updateIndexDoc(string $index_name, string $type, $id, array $content)
{
$param = [
'index' => $index_name,
'type' => $type,
'id' => $id,
'body' => $content,
];
$resp = $this->client->index($param);
$success = $this->shards_successful($resp);
return $success;
}
/**
* 根据索引删除文档
* @param string $index_name
* @param string $type
* @param $id
* @return bool
*/
public function dropIndexDoc(string $index_name, string $type, $id)
{
$param = [
'index' => $index_name,
'type' => $type,
'id' => $id,
];
$resp = $this->client->delete($param);
$success = $this->shards_successful($resp);
return $success;
}
/**
* 检查请求是否得到确认
* @param array $resp
* @return bool
*/
protected function is_acknowledged(array &$resp)
{
return (isset($resp['acknowledged']) && $resp['acknowledged']);
}
/**
* 检查插入/更新请求是否被接受
* @param array $resp
* @return bool
*/
protected function shards_successful(array &$resp)
{
return (isset($resp['_shards']) && $resp['_shards']['successful'] > 0);
}
/**
* 获取提供的索引的数据库映射
* @param string $index_name
* @return array
*/
protected function get_db_mapping(string $index_name)
{
$config = config("search-engine.elastic.db_mapping.{$index_name}");
if (is_null($config)) return [null, null];
if (!isset($config['db_table']) || !isset($config['db_sql'])) return [null, null];
$table_name = $config['db_table'];
$sql = $config['db_sql'];
$table_name = preg_replace('/\s/', '', $table_name);
return array($table_name, $sql);
}
/**
* 获取索引的更新时间
* @param string $index_name
* @return bool|int
*/
protected function update_time4index(string $index_name)
{
list($table_name) = $this->get_db_mapping($index_name);
$db_update_name = intval($this->get_db_last_update_time($table_name));
$search_ret = $this->searchSimpleMatch(static::BGC_FUN_META, '_doc', 'index_name', $index_name);
if (0 == count($search_ret['hits'])) {
LogTool::warning("try to check should_update_index of $index_name, no meta hits.");
return false;
}
$se_update_time = $search_ret['hits'][0]['_source']['last_updated'];
if (0 === $se_update_time) return 0;
return ($se_update_time < $db_update_name ? $db_update_name : false);
}
/**
* 获取表的最后更新时间
* @param string $table_name
* @return int|null
*/
protected function get_db_last_update_time(string $table_name)
{
$db_default = config('database.default');
$db_name = config("database.connections.{$db_default}.database");
$conditions = [
['table_schema', '=', $db_name],
['table_name', '=', $table_name],
];
$data = DB::table('information_schema.tables')
->where($conditions)
->selectSub('UNIX_TIMESTAMP(update_time)', 'update_time')
->get();
$ret = (isset($data[0]->update_time) ? intval($data[0]->update_time) : null);
return $ret;
}
/**
* 检查搜索结果的命中总和和当前命中
* @param array $resp
* @return array
*/
protected function get_hits(array &$resp)
{
// Laravel: $resp['hits']['total']['value'] |BUT| Lumen:$resp['hits']['total']
$total = isset($resp['hits']['total']) ? $resp['hits']['total']['value'] : 0;
if (isset($resp['hits']['hits']) && ($hit_count = count($resp['hits']['hits'])) > 0) {
return array($total, $hit_count);
}
return array($total, 0);
}
/**
* 返回搜索结果
* @param int $total
* @param int $hits
* @param array $resp
* @return mixed
*/
protected function search_result(int $total, int $hits, array &$resp)
{
if ($hits > 0) {
$ret = [
'total' => $total,
'hits' => $resp['hits']['hits'],
];
} else {
$ret = [
'total' => $total,
'hits' => [],
];
}
return $ret;
}
/**
* 使用数据库的数据批量插入/更新索引
* @param string $index_name
* @param string $type
* @param iterable $data
* @return bool
*/
protected function bulk_update_index(string $index_name, string $type, iterable &$data)
{
$param = [];
foreach ($data as $datum) {
$index = [
'_index' => $index_name,
'_type' => $type,
];
if (isset($datum->id)) {
$index['_id'] = $datum->id;
}
$param['body'][] = [
'index' => $index,
];
$d = (array)($datum);
$param['body'][] = $d;
}
$resp = $this->client->bulk($param);
$success = $this->bulk_succeed($resp);
return $success;
}
/**
* 检查批量插入是否成功
* @param array $resp
* @return bool
*/
protected function bulk_succeed(array &$resp)
{
return (isset($resp['errors']) && !$resp['errors']);
}
/**
* 解析查询字符串,返回ElasticSearch查询子句
* @param string $query
* @param bool $exact
* @return array
*/
protected function parse_query(string $query, bool $exact = false)
{
// term:精准查询 match:模糊匹配
$method = $exact ? 'term' : 'match';
[$field, $value] = explode('=', $query);
$param = [
$method => [
trim($field) => trim($value),
]
];
$words = [trim($value)];
return [$param, $words];
}
/**
* 简单搜索具有精准匹配的字段
* @param string $index_name
* @param string $type
* @param string $field
* @param string $needle
* @param bool $exact
* @return mixed
*/
public function searchSimpleMatch(
string $index_name,
string $type,
string $field,
string $needle,
bool $exact = false
) {
$method = $exact ? 'term' : 'match';
$param = [
'index' => $index_name,
'type' => $type,
'body' => [
'query' => [
$method => [
$field => $needle,
],
],
],
];
$resp = $this->client->search($param);
list($total, $hits) = $this->get_hits($resp);
$ret = $this->search_result($total, $hits, $resp);
return $ret;
}
/**
* 查询索引
* @param string $index_name
* @param string $type
* @param array $query_body
* @param array $search_words
* @param int $page
* @param int $page_size
* @param bool $collect_source
* @param callable|null $cb
* @return array|mixed
*/
public function queryIndexWithClause(
string $index_name,
string $type,
array $query_body,
array $search_words,
int $page,
int $page_size = self::DEFAULT_PAGE_SIZE,
bool $collect_source = false,
callable $cb = null
) {
if ($page < 1) $page = 1;
$param = [
'index' => $index_name,
'type' => $type,
'body' => $query_body,
'from' => ($page - 1) * $page_size,
'size' => $page_size,
];
$resp = $this->client->search($param);
list($total, $hits) = $this->get_hits($resp);
$ret = $this->search_result($total, $hits, $resp);
if (1 === $resp && !in_array($index_name, [static::BGC_FUN_META, static::SEARCH_HOT_WORD]) &&
$total > 0
) {
foreach ($search_words as $word) {
if (!empty($word)) {
$this->updateHotWord($index_name, $word);
}
}
}
if ($collect_source) {
$ret = $this->collectSource($ret);
}
if (is_callable($cb)) {
$ret = $cb($ret);
}
return $ret;
}
/**
* 使用指定的模式查询索引
* @param int $pattern_no
* @param string $needle
* @param int $page
* @param int $page_size
* @param bool $collect_source
* @param callable|null $cb
* @return array|mixed|null
*/
public function queryWithPattern(
int $pattern_no,
string $needle,
int $page,
int $page_size = self::DEFAULT_PAGE_SIZE,
bool $collect_source = false,
callable $cb = null
) {
$config = config("search-query.{$pattern_no}");
if (null === $config) {
return null;
}
$config_txt = json_encode($config);
$config_txt = str_replace('"?"', '"' . $needle . '"', $config_txt);
$config = json_decode($config_txt, true);
$type = $config['type'] ?? 'text';
$data = $this->queryIndexWithClause(
$config['index'], $type, $config['body'], [$needle], $page, $page_size, $collect_source
);
if (is_callable($cb)) {
$data = $cb($data);
}
return $data;
}
/**
* 更新热词列表
* @param string $index_name
* @param string $word
*/
public function updateHotWord(string $index_name, string $word)
{
$entry = null;
$id = "$index_name:$word";
try {
if (null === $entry) {
$this->updateIndexDoc(static::SEARCH_HOT_WORD, '_doc', $id, [
'count' => 1,
'word' => $word,
'index_name' => $index_name,
'last_update' => time(),
]);
} else {
$this->client->update([
'index' => static::SEARCH_HOT_WORD,
'type' => '_doc',
'id' => $id,
'body' => [
'script' => [
'lang' => 'painless',
'inline' => 'ctx._source.count = ctx._source.count + 1',
],
'last_update' => time(),
],
]);
}
} catch (\Exception $e) {
LogTool::warning("fail to update hot word for $word");
}
}
/**
* 移除热词
* @param int $before
* @return mixed
*/
public function removeOldHotWord(int $before)
{
$timestamp = $before;
$resp = $this->client->deleteByQuery([
'index' => static::SEARCH_HOT_WORD,
'type' => '_doc',
'body' => [
'query' => [
'range' => [
'last_update' => [
'lt' => $timestamp,
],
],
],
],
]);
return $this->count_deleted($resp);
}
/**
* 返回已删除数量
* @param array $resp
* @return bool
*/
protected function count_deleted(array &$resp)
{
if (!isset($resp['deleted'])) return false;
return $resp['deleted'];
}
/**
* 从搜索结果中收集所有源数据
* @param array $result
* @return array
*/
public static function collectSource(array $result)
{
$ret = [];
$highlight = [];
$hits = isset($result['hits']) ? $result['hits'] : $result;
$count = 0;
foreach ($hits as $hit) {
if (isset($hit['_source'])) {
$ret[$count] = $hit['_source'];
if (isset($hit['highlight'])) {
$h = [];
foreach ($hit['highlight'] as $key => $value) {
$h[$key] = str_replace('</em><em>', '', $value[0]);
}
$highlight[$count] = $h;
}
$count++;
}
}
foreach ($ret as $key => $value) {
if (isset($highlight[$key])) {
$ret[$key] = array_merge($ret[$key], $highlight[$key]);
}
}
return $ret;
}
}