使用ES的bulk接口导入批量数据(python进行格式化,curl提交)

#!/bin/env python
# -*- coding: utf-8 -*-
'''
修改:2015/9/25 ver.2
原因:ver.1 要使用elasticsearch的官方库,不方便,这版使用bulk接口,curl提交
修改:2015/9/30 ver.3
原因:封装成class,方便调用
'''

import sys
import os
from optparse import OptionParser
from datetime import datetime
import subprocess as sub
import json

class loadDataToES:
    def __init__(self, field_desc, data_file, host='127.0.0.1', port='9200', index='test', doctype='others', delimeter=',', tmp_file='/dev/shm/_tmp_data_to_es', cut_off=10000):
        self.host = host
        self.port = port
        self.index = index
        self.doctype = doctype
        self.delimeter = delimeter
        self.tmp_file = tmp_file
        self.field_desc = field_desc
        self.data_file = data_file
        self.header = '{"index":{"_index":"%s", "_type":"%s"}}' %(self.index, self.doctype)
        self.cut_off = cut_off
        self.url = 'http://%s:%s/_bulk' %(self.host, self.port)

    def load_data(self):
        '''
        expample data from the file:
        2015-09-24 09:17:29,memory_11601,123988
        '''
        self.body_list = []
        self.bulk = ''
        self.line_num = 0
        self.pretty_print('INFO: loadding data to es, host: %s, index: %s' %(self.host, self.index))
        self.parse_field()
        with open(self.data_file, 'r') as f_desc:
            for line in f_desc:
                self.do_line(line)
                self.line_num += 1
                if self.line_num >= self.cut_off:
                    self.bulk_content = '\n'.join(self.body_list)
                    self._load_data()
                    self.body_list = []
                    self.bulk = ''
                    self.line_num = 0
            if self.line_num > 0:
                self.bulk_content = '\n'.join(self.body_list)
                self._load_data()
        self.pretty_print('INFO: all lines parsed.')

    def do_line(self, line):
        fields = line.strip().split(self.delimeter)
        if len(fields) != self.field_len:
            self.pretty_print("ERROR: line %d not match fields" % line_num)
            return
        body_tmp = str(self.get_body(fields, self.fields_list))
        self.body_list.append(self.header)
        self.body_list.append(body_tmp.replace("'", '"'))


    def parse_field(self):
        fields_list = []
        fields_desc = self.field_desc.strip().split(self.delimeter)
        for item in fields_desc:
            items = item.split('|')
            fields_list.append([items[0], items[1]])
        self.fields_list = fields_list
        self.field_len = len(fields_list)

    def _load_data(self):
        open(self.tmp_file, 'w').write(self.bulk_content)
        p = sub.Popen(['curl', '-s', '-XPOST', self.url, '--data-binary', "@" + self.tmp_file ], stdout=sub.PIPE)
        for line in iter(p.stdout.readline, b''):
            ret_dict = json.loads(line)
            if not ret_dict['errors']:
                self.pretty_print("INFO: %6s lines parseed with no errors, total cost %d ms." %(len(ret_dict['items']), ret_dict['took']))
            else:
                self.pretty_print("ERROR: %6s lines parseed with some errors, total cost %d ms." %(len(ret_dict['items']), ret_dict['took']))

    def pretty_print(self, str):
        print('%s %s' %(datetime.now(), str))

    def get_body(self, fields, fields_list):
        counter = 0
        body = {}
        while (counter < len(fields)):
            # if the data type is 'date', we will translate the value str to date
            # type
            if fields_list[counter][1] == 'date':
                body[fields_list[counter][0]] = self.translate_str_to_date(
                    fields[counter])
            # and if the data type is 'int', we translate it to int
            elif fields_list[counter][1] == 'int':
                body[fields_list[counter][0]] = self.translate_str_to_int(
                    fields[counter])
            elif fields_list[counter][1] == 'float':
                body[fields_list[counter][0]] = self.translate_str_to_float(
                    fields[counter])
            # other is defalut to be str
            else:
                body[fields_list[counter][0]] = fields[counter]
            counter += 1
        # print(my_body)
        return body

    def translate_str_to_date(self, date_str):
        try:
            date = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S')
            return date.isoformat()
        except:
            self.pretty_print("Unexpected error: %s" % (sys.exc_info()[0]))
            self.pretty_print("Failed to translate '%s' to date." % (date_str))
        return False


    def translate_str_to_int(self, num_str):
        try:
            return int(num_str)
        except:
            self.pretty_print("Failed to translate '%s' to int." % (num_str))
        return False


    def translate_str_to_float(self, num_str):
        try:
            return float(num_str)
        except:
            self.pretty_print("Failed to translate '%s' to int." % (num_str))
        return False

if __name__ == '__main__':
    '''
    example fields_desc:@timestamp|date,process|str,mem|int
    example lines in file:
    2015-09-24 09:17:29,memory_11601,203532
    2015-09-24 09:17:29,memory_11602,223112
    '''
    loader = loadDataToES(field_desc='@timestamp|date,process|str,mem|int', data_file='/root/scripts/in.data', host='10.21.102.60', index = 'test')
    loader.load_data()

编程技巧