Client
Client是一个类,通过这个类可以实现对ES集群的各种操作:Index, Get, Delete , Search,以及对ES集群的管理任务。
Client的构造需要基于TransportClient
TransportClient
TransportClient可以远程连接ES集群,通过一个传输模块,但是它不真正的连接到集群,只是获取集群的一个或多个初始传输地址,在每次请求动作时,才真正连接到ES集群。
Settings
Settings类主要是在启动Client之前,配置一些属性参数,主要配置集群名称cluster.name,还有其他参数:
client.transport.sniff 是否为传输client添加嗅探功能
client.transport.ignore_cluster_name 设为true,忽略连接节点的集群名称验证
client.transport.ping_timeout 设置ping节点时的时间限,默认5s
client.transport.nodes_sampler_interval 设置sample/ping nodes listed 间隔时间,默认5s
//通过Settings类设置属性参数 Settings settings = Settings.settingsBuilder().put("cluster.name","index-name").build(); //启动Client Client client = TransportClient.builder().settings(settings).build(). addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.xxx.xxx"),9300)); //如果不需要设置参数,直接如下 /*Client client = TransportClient.builder().build(). addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.xxx.xxx"),9300));*/ //关闭Clinet client.close();
Document API
主要分为以下类:Index API , Get API , Delete API , Update API, Multi Get API, Bulk API
es中的增删改查
Index API可以索引一个典型的JSON文档到指定的索引中,并且可以使它可以检索。
产生JSON
JSON产生可以有以下几种方式:
手动拼接一个JSON字符串
使用Map
使用第三方库,比如Jackson
使用内置的XContentFactory.jsonBuilder()
每种类型都会转换为byte[],因此如果对象已经是这种形式,可以直接使用,jsonBuilder是一个高度优化了的JSON产生器,它直接构造byte[]
通过下边的代码讲解四种方法:index-api, get-api, delete-api, update-api
/** * es-api的方法学习: * 1.prepareIndex方法:索引数据到ElasticSearch * 2.prepareGet方法:获取信息 * 3.prepareDelete方法:删除信息 * 4.update方法:更新信息 * 4.1 upsert:在使用update方法时: * a:针对文档不存在的情况时,做出index数据的操作,update无效; * b:如果文档存在,那么index数据操作无效,update有效; */ public static void main(String[] args) throws IOException, InterruptedException, ExecutionException { //通过Settings类设置属性参数 Settings settings = Settings.settingsBuilder().put("cluster.name","myApp").build(); //启动Client Client client = null; try { client = TransportClient.builder().settings(settings).build(). addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("101.200.124.27"),9300)); } catch (UnknownHostException e) { e.printStackTrace(); } //执行操作 SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd"); XContentBuilder jsonBuilder = XContentFactory.jsonBuilder() .startObject() .field("user","yuchen") .field("interest","reading book") .field("insert_time",df.format(new Date())) .endObject(); //1.prepareIndex方法:索引数据到ElasticSearch IndexResponse response = client.prepareIndex("index-test","weibo","4") .setSource(jsonBuilder) .get(); String _index = response.getIndex(); String _type = response.getType(); String _id = response.getId(); long _version = response.getVersion(); boolean created = response.isCreated(); System.out.println(_index+" "+_type+" "+_id+" "+_version+" "+created); //2.prepareGet方法:获取信息 GetResponse getResponse = client.prepareGet("index-test","weibo","1").get(); System.out.println(getResponse.getSourceAsString()); //3.prepareDelete方法:删除信息 DeleteResponse deleteResponse = client.prepareDelete("index-test","weibo","4").get(); System.out.println(deleteResponse.isFound()); //4.update方法:更新信息 UpdateRequest updateRequest = new UpdateRequest(); updateRequest.index("index-test"); updateRequest.type("weibo"); updateRequest.id("1"); updateRequest.doc(XContentFactory.jsonBuilder().startObject().field("interest","music").endObject()); UpdateResponse updateResponse = client.update(updateRequest).get(); System.out.println(updateResponse.isCreated()); //update方法: 可以为已有的文档添加新的字段 UpdateResponse updateResponse2 = client.prepareUpdate("index-test", "weibo", "1") .setDoc(XContentFactory.jsonBuilder() .startObject() .field("interest2","reading") .endObject()).get(); System.out.println(updateResponse2.isCreated()); //4.1 upsert:在使用update方法时,a:针对文档不存在的情况时,做出index数据的操作,update无效; // b:如果文档存在,那么index数据操作无效,update有效; //先构建一个IndexRequest IndexRequest indexRequest = new IndexRequest("index-test","weibo","14"); indexRequest.source(XContentFactory.jsonBuilder() .startObject() .field("user","yuchen2") .field("interest","eating") .field("insert_time",df.format(new Date())) .endObject()); //再构建一个UpdateRequest,并用IndexRequest关联 UpdateRequest updateRequest3 = new UpdateRequest("index-test","weibo","14"); updateRequest3.doc(XContentFactory.jsonBuilder() .startObject() .field("interest2","love") .endObject() ).upsert(indexRequest); client.update(updateRequest3).get(); if(client != null){ client.close(); } }
批量操作
Multi Get Api 和 Bulk Api可进行批量的增删改查
使用Multi Get Api 批量获取:
//1. Muti-get Api //可以指定单个id,也在index,type下指定一个id-list;也可以指定别的index/type MultiGetResponse multiGetResponse = client.prepareMultiGet() .add("index-test","weibo","1")//指定单个id .add("index-test","weibo","11","13","14")//指定一个id-list .add("index-other","news","1","3").get();//指定别的index/type for(MultiGetItemResponse item:multiGetResponse){ GetResponse response = item.getResponse(); System.out.println(response.getSourceAsString()); }
Bulk Api批量增加:
//2.Bulk Api:可以进行批量index和批量删除操作 //2.1批量增加 BulkRequestBuilder bulkRequest = client.prepareBulk(); bulkRequest.add(client.prepareIndex("index-test", "weibo", "20") .setSource(XContentFactory.jsonBuilder() .startObject() .field("user", "yuchen20") .field("postDate", new Date()) .field("message", "trying out Elasticsearch") .endObject() ) ); bulkRequest.add(client.prepareIndex("index-test", "weibo", "21") .setSource(XContentFactory.jsonBuilder() .startObject() .field("user", "yuchen21") .field("postDate", new Date()) .field("message", "trying out Elasticsearch") .endObject() ) ); BulkResponse bulkResponse = bulkRequest.get(); if(bulkResponse.hasFailures()){ //... }
Bulk Api批量删除:
//2.2批量删除 BulkRequestBuilder bulkRequest = client.prepareBulk(); bulkRequest.add(client.prepareDelete("index-test", "weibo", "20") ); bulkRequest.add(client.prepareDelete("index-test", "weibo", "21") ); BulkResponse bulkResponse = bulkRequest.get(); if(bulkResponse.hasFailures()){ System.out.println("bulk error:"+bulkResponse.buildFailureMessage()); }
Bulk Api 批量更新
//2.3批量更新 BulkRequestBuilder bulkRequest = client.prepareBulk(); bulkRequest.add(client.prepareUpdate("index-test", "weibo", "11").setDoc(XContentFactory .jsonBuilder().startObject() .field("country","China")//新添加字段 .endObject() ) ); bulkRequest.add(client.prepareUpdate("index-test", "weibo", "13").setDoc(XContentFactory .jsonBuilder().startObject() .field("user","yuchen13")//更新字段 .endObject() ) ); BulkResponse bulkResponse = bulkRequest.get(); if(bulkResponse.hasFailures()){ System.out.println("bulk error:"+bulkResponse.buildFailureMessage()); }
BulkProcessor设置批量请求的属性
//BulkProcessor BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { @Override public void beforeBulk(long arg0, BulkRequest arg1) { //批量执行前做的事情 System.out.println("bulk api action starting..."); } @Override public void afterBulk(long arg0, BulkRequest arg1, Throwable arg2) { System.out.println("exception:bukl api action ending...:"+arg2.getMessage()); } @Override public void afterBulk(long arg0, BulkRequest arg1, BulkResponse arg2) { //正常执行完毕后... System.out.println("normal:bukl api action ending..."); } }) //设置多种条件,对批量操作进行限制,达到限制中的任何一种触发请求的批量提交 .setBulkActions(1000)//设置批量操作一次性执行的action个数,根据请求个数批量提交 //.setBulkSize(new ByteSizeValue(1,ByteSizeUnit.KB))//设置批量提交请求的大小允许的最大值 //.setFlushInterval(TimeValue.timeValueMillis(100))//根据时间周期批量提交请求 //.setConcurrentRequests(1)//设置允许并发请求的数量 //设置请求失败时的补偿措施,重复请求3次 //.setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) .build(); for(int i =0;i<100000;i++){ bulkProcessor.add(new IndexRequest("index-test","weibo2",""+i).source( XContentFactory .jsonBuilder() .startObject() .field("name","yuchen"+i) .field("interest","love"+i) .endObject())); } bulkProcessor.awaitClose(5, TimeUnit.MINUTES);//释放bulkProcessor资源 System.out.println("load succeed!");默认的参数:
sets bulkActions to
1000
sets bulkSize to
5mb
does not set flushInterval
sets concurrentRequests to 1
sets backoffPolicy to an exponential backoff with 8 retries and a start delay of 50ms. The total wait time is roughly 5.1 seconds.
参考地址:
http://blog.csdn.net/wuyzhen_csdn/article/details/52381697