<?php namespace app\common\logic; //require '/vendor/autoload.php'; use app\common\exception\InvalidArgumentsException; use Elasticsearch\ClientBuilder; use Elasticsearch\Common\Exceptions\ElasticCloudIdParseException; use think\facade\Log; class Elastic { private $client; static private $instance; private function __construct(){ $this->client = ClientBuilder::create()->setHosts(config('es.hosts'))->setBasicAuthentication(config('es.user'),config('es.passwd'))->build(); } /** * 私有化克隆函数,防止类外克隆对象 */ private function __clone () {} static public function getInstance() { //判断$client是否是Singleton的对象,不是则创建 if (!self::$instance instanceof self) { self::$instance = new self(); } return self::$instance; } /** * 批量插入文档 * @param array $params * @return bool */ public function addAllDos(array $params,$index='chat') { foreach ($params as $param){ $params1['body'][] = [ 'index' => [ #创建或替换 '_index' => $index, '_id' => $param['id'], ], ]; $params1['body'][] = $param; } try { $this->client->bulk($params1); } catch (\Exception $e) { Log::write($e->getMessage(), "插入失败"); return false; } return true; } /** * 更新文档(更新一条数据) * @param int $id * @param array $body * @param string $index * @return bool */ public function updateDoc(int $id, array $body, string $index): bool { $params = [ 'index' => $index, 'id' => $id, 'body' => [ 'doc' => $body ] ]; try { $this->client->update($params); } catch (\Exception $e) { Log::write($e->getMessage(), "更新文档{$id}失败"); return false; } return true; } /** * 删除文档(删除一条数据) * @param int $id * @param string $index * @return bool */ public function deleteDoc(int $id, string $index): bool { $params = [ 'index' => $index, 'id' => $id ]; try { $this->client->delete($params); } catch (\Exception $e) { Log::write($e->getMessage(), "删除文档{$id}失败"); return false; } return true; } /** * 检测索引是否存在,只能单个索引名称检测 * @param string $index 索引名称 * @return bool */ public function indexExists($index) { $response = $this->client->indices()->exists(['index' => $index]); return $response; } /** * query 'query' => [ 多字段匹配 'multi_match' => [ 'query' => $keywords, 'fields' => ['title', 'content', 'keyword'], 'type' => 'most_fields' // most_fields 多字段匹配度更高 best_fields 完全匹配占比更高 ], 单个字段匹配 'match' => [ 'title' => $keywords ], 完全匹配 'match_phrase' => [ 'title' => $keywords ], 联合查询 'bool' => [ 'should' => [ // 相当于or [ 'match_phrase' => [ 'title' => $keywords ] ], [ 'match_phrase' => [ 'content' => $keywords ] ], [ 'match_phrase' => [ 'keyword' => $keywords ] ], ], 'must' => [ // 相当于and [ 'match' => [ 'title' => $keywords ] ], ], 'should' => [ // 相当于or [ 'match' => [ 'title' => $keywords ] ], [ 'match' => [ 'content' => $keywords ] ], [ 'match' => [ 'keyword' => $keywords ] ], ], 'must_not' => [ // 相当于not [ 'match' => [ 'content' => $keywords ] ] ], 'filter' => [ // 过滤器 gt 大于;gte 大于等于;lt 小于;lte 小于等于 'range' => [ 'id' => ['lt' => 20598, 'gt' => 20590] ] ], ], ], 'highlight' => [ // 搜索词高亮设置 // 'pre_tags' => "<p class='key' style='color: red;'>", // 自定义高亮样式 // 'post_tags' => "</p>", 'fields' => [ // 设置高亮的字段 'title' => (object)[], 'content' => (object)[], 'keyword' => (object)[], ] ], */ /** * 获取索引文档数据列表 * @param $index 表名 * @param array $where 实例 ['title'=>'test title','name'=>'jjjj'] * @param string $orderField * @param string $sort * @param int $limit * @param int $page * @return mixed */ public function getList($index, $query=[],$orderField='id',$sort='desc', $limit = 10, $page = 1) { $offset = ((int)$page - 1) * (int)$limit; $params = [ 'index' => $index, 'body' => [ 'query' => $query, 'sort' => [[$orderField => ['order' => $sort]]], 'from' => $offset, 'size' => $limit, //'aggs'=>$aggs, ] ]; //聚合查询 if (isset($query['aggs'])){ $params['body']['aggs'] = $query['aggs']; $params['body']['size'] = 0; unset($query['aggs']); $params['body']['query'] = $query; } try { $response = $this->client->search($params); $data = []; $total = 0; if (isset($response['hits']['total']['value']) && !isset($response['aggregations'])) { if ( $response['hits']['total']['value'] >0 ){ //循环数据 foreach ( $response['hits']['hits'] as $value ){ $data[] = $value['_source']; } } $total = $response['hits']['total']['value']; } //聚合查询的结果 if ( isset($response['aggregations']) ) { /* foreach ($response['aggregations']['group_']['buckets'] as $key=>$value) { }*/ return ['code'=>200,'msg'=>'','data'=>$response['aggregations']]; } return ['code'=>200,'msg'=>'','data'=>$data,'total'=>$total]; }catch (\Exception $e){ throw new InvalidArgumentsException("es查询错误".$e->getMessage(),400); } //$response['hits']['hits']是具体的数据列表 return $response['hits']; } /** * 创建表 doc * @param string $index_name * @param array $body * @return array */ public function create_mappings($index_name = 'chat',$body=[]) { 类型 text、integer、float、double、boolean、date $body = [ 'id' => ['type' => 'long'], 'from_id' => ['type' => 'text'], 'from_name' => ['type' => 'text'], 'from_avatar' => ['type' => 'text'], 'to_id' => ['type' => 'text'], 'to_name' => ['type' => 'text'], 'seller_code' => ['type' => 'text',], 'content' => ['type' => 'text','index'=>true,'analyzer'=>'ik_max_word'], 'read_flag' => ['type' => 'integer'], 'content_type' => ['type' => 'integer'], 'client_id' => ['type' => 'long'], 'remark' => ['type' => 'text','index'=>true,'analyzer'=>'ik_max_word'], 'create_time' => ['type' => 'long'], ]; $params = [ 'index' => $index_name, //索引名称 'body' => [ 'settings' => [ // 设置配置 'number_of_shards' => 5, //主分片数 'number_of_replicas' => 1 //主分片的副本数 ], 'mappings' => [ // 设置映射 '_source' => [ // 存储原始文档 'enabled' => 'true' ], 'properties' => $body // 配置数据结构与类型 ], ] ]; $response = $this->client->indices()->create($params); return $response; } } //创建index的命令脚本 <?php namespace app\command; use app\common\logic\Elastic; use think\console\Command; use think\console\Input; use think\console\Output; class CreateEsIndex extends Command { protected function configure() { // 指令配置 $this->setName('createesindex'); // 设置参数 } protected function execute(Input $input, Output $output) { // 指令输出 //$output->writeln('createesindex'); 类型 text、long,integer、float、double、boolean、date,keyword $body = [ 'id' => ['type' => 'long'], 'from_id' => ['type' => 'keyword'], 'from_name' => ['type' => 'text','fielddata'=>true,'fields'=>['raw'=>['type'=>'keyword']]], 'image' => ['type' => 'text'], 'to_id' => ['type' => 'keyword'], 'to_name' => ['type' => 'text'], 'code' => ['type' => 'text',], 'content' => ['type' => 'text','index'=>true,'analyzer'=>'ik_max_word'], 'status' => ['type' => 'integer'], 'content_type' => ['type' => 'integer'], 'client_code' => ['type' => 'long'], 'create_time' => ['type' => 'long'], ]; $params = [ 'index' => 'chat', //索引名称 'body' => [ 'settings' => [ // 设置配置 'number_of_shards' => 5, //主分片数 'number_of_replicas' => 0 //主分片的副本数 ], 'mappings' => [ // 设置映射 /* '_source' => [ // 存储原始文档 'enabled' => 'true' ],*/ 'properties' => $body // 配置数据结构与类型 ], ] ]; $response = Elastic::getInstance()->indices()->create($params); return $response; } } 一些逻辑查询 $query = [ 'bool'=>[ 'must'=>[ ], 'filter' => [ 'range'=>[ 'create_time'=>[ 'gt' => (int)$start_time , 'lt' =>(int)$end_time ] ], ], // 'minimum_should_match'=>1, ], ]; if ($param['keyword']){ $query['bool']['must'][] = [ 'match_phrase'=>[ 'content'=>$param['keyword'], ], ]; } //查询发送者或者接收者 if ($customerCode){ $query['bool']['should'][] = [ 'match'=>[ 'to_id'=>$customerCode, ], ]; $query['bool']['should'][] = [ 'match'=>[ 'from_id'=>$customerCode, ], ]; //or查询至少满足一个条件 $query['bool']['minimum_should_match']=1; } //page小于0向上翻页,否则是展示当前limit条信息 if (isset($param['id']) ){ if ($page>0){ $query['bool']['must'][] = [ 'range'=>[ 'id'=>['gte'=>$param['id']], ], ]; }else{ $page = -$page; $query['bool']['must'][] = [ 'range' => [ 'id'=>['lt'=>$param['id']], ], ]; } } //全文搜索 if (!$all){ $query['bool']['must'][] = [ 'match'=>[ 'seller_code'=>$sellerCode, ], ]; } 分页聚合查询 $size = $param['page'] *$param['limit']; $query = [ 'bool'=>[ 'must'=>[ [ 'match_phrase'=>[ 'from_name'=>$param['name'], ], ], [ 'match_phrase'=>[ 'to_id'=>$param['code'], ], ], ], // 'minimum_should_match'=>1, ], 'aggs'=>[ 'group_from_name'=>[ 'terms'=>[ 'field'=>'from_name.raw', 'size'=>$size ], 'aggs'=>[ 'agency_names'=>[ 'top_hits'=>[ 'size'=>1, '_source'=>[ 'include'=>['from_id','from_name','from_avatar'] ], ], ], ], ], ], ]; 循环入es $chat = new Chat(); $chat->chunk(300,function ($chatlogs) use($output){ foreach ($chatlogs as $key=>$chatlog){ //写入es $chatlogs[$key]['create_time'] = strtotime($chatlog['create_time']); } Elastic::getInstance()->addAllDos($chatlogs); });