| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307 | import requestsimport gzipimport argparseimport csvimport datetimeimport jsonfrom pytz import timezonefrom influxdb import InfluxDBClientepoch_naive = datetime.datetime.utcfromtimestamp(0)epoch = timezone('UTC').localize(epoch_naive)def unix_time_millis(dt):    return int((dt - epoch).total_seconds() * 1000)#### Check if data type of field is float##def isfloat(value):        try:            float(value)            return True        except:            return Falsedef isbool(value):    try:        return value.lower() in ('true', 'false')    except:        return Falsedef str2bool(value):    return value.lower() == 'true'#### Check if data type of field is int##def isinteger(value):        try:            if(float(value).is_integer()):                # changed, don't only "guess" if it is int with .is_integer() as this also returns true for a float number with 0 decimal (ie 20.0)                # instead we also check the actual data and do not return it is int if contains a dot                if value.find('.') == -1:                    return True                else:                    return False            else:                return False        except:            return Falsedef loadCsv(inputfilename, servername, user, password, dbname, metric,     timecolumn, timeformat, tagcolumns, fieldcolumns, usegzip,     delimiter, batchsize, create, datatimezone, usessl, showdata, dryrun, datatypes, tspass):    host = servername[0:servername.rfind(':')]    port = int(servername[servername.rfind(':')+1:])        if dryrun:        showdata = True        rpname = False    if dbname.find('.') != -1:        print("dbname contains a retention policy.")        tmpdbname = dbname.split('.')        dbname = tmpdbname[0]        rpname = tmpdbname[1]        print("dbname: " + dbname)        print("rpname: " + rpname)        client = InfluxDBClient(host, port, user, password, dbname, ssl=usessl)    if(create == True and dryrun == False):        print('Deleting database %s'%dbname)        client.drop_database(dbname)        print('Creating database %s'%dbname)        client.create_database(dbname)    client.switch_user(user, password)    # format tags and fields    if tagcolumns:        tagcolumns = tagcolumns.split(',')    if fieldcolumns:        fieldcolumns = fieldcolumns.split(',')        print()        fields_datatypes = dict()        if datatypes:        tmpdatatypes = datatypes.split(',')        print("specified data types:")        for tmpdatatype in tmpdatatypes:            dt = tmpdatatype.split('=')            fields_datatypes[dt[0]] = dt[1]            print("column '" + dt[0] + "' => " + dt[1])    else:        print("guessing data types from data in CSV row 2...")            # open csv    datapoints = []    inputfile = open(inputfilename, 'r')    count = 0        with inputfile as csvfile:        reader = csv.DictReader(csvfile, delimiter=delimiter)                for row in reader:                        if showdata:                print("Input: ", row)                        if not tspass:                datetime_naive = datetime.datetime.strptime(row[timecolumn],timeformat)                                if datetime_naive.tzinfo is None:                    datetime_local = timezone(datatimezone).localize(datetime_naive)                else:                    datetime_local = datetime_naive                                    timestamp = unix_time_millis(datetime_local) * 1000000 # in nanoseconds            else:                timestamp = row[timecolumn]                        tags = {}            for t in tagcolumns:                v = 0                if t in row:                    v = row[t]                tags[t] = v                        fields = {}            for f in fieldcolumns:                v = 0                if f in row:                    skipfield = False                    if count == 0 and not datatypes:                        # first row, guess data types ONLY from there and remember them for the following rows                        if (isinteger(row[f])):                            print("column '" + f + "' = '" + str(row[f]) + "' => int")                            fields_datatypes[f] = "int"                            v = int(float(row[f]))                        elif (isfloat(row[f])):                            print("column '" + f + "' = '" + str(row[f]) + "' => float")                            fields_datatypes[f] = "float"                            v = float(row[f])                        elif (isbool(row[f])):                            print("column '" + f + "' = '" + str(row[f]) + "' => bool")                            fields_datatypes[f] = "bool"                            v = str2bool(row[f])                        else:                            print("column '" + f + "' = '" + str(row[f]) + "' => str")                            fields_datatypes[f] = "str"                            v = row[f]                    else:                        # from 2nd row only use data types guessed from row 1.                         # check if datatype for each column fits and skip value if not (useful if there are a few missing values in the CSV)                        if (fields_datatypes[f] == "int"):                            if (isinteger(row[f])):                                v = int(float(row[f]))                            else:                                skipfield = True                                print("CSV row " + str(count+2) + ": skipped field '" + f + "' as it has a different data type.")                        elif (fields_datatypes[f] == "float"):                            if (isfloat(row[f])):                                v = float(row[f])                            else:                                skipfield = True                                print("CSV row " + str(count+2) + ": skipped field '" + f + "' as it has a different data type.")                        elif (fields_datatypes[f] == "bool"):                            if (isbool(row[f])):                                v = str2bool(row[f])                            else:                                skipfield = True                                print("CSV row " + str(count+2) + ": skipped field '", f, "' as it has a different data type.")                        elif (fields_datatypes[f] == "str"):                            v = row[f]                                        if not skipfield:                    fields[f] = v                            if len(fields) > 0:                point = {"measurement": metric, "time": timestamp, "fields": fields, "tags": tags}                if showdata:                    print("Output: ", json.dumps(point, indent=3))                    datapoints.append(point)                count+=1            else:                print("CSV row " + str(count+2) + ": skipped as it contains no field values.")                count+=1                        if len(datapoints) % batchsize == 0:                print('Read %d lines'%count)                print('Inserting %d datapoints...'%(len(datapoints)))                                #if showdata:                #    print(json.dumps(datapoints, indent=3))                if not dryrun:                    if rpname:                        response = client.write_points(datapoints, retention_policy=rpname)                    else:                        response = client.write_points(datapoints)                                    if not response:                        print('Problem inserting points, exiting...')                        exit(1)                        print("Wrote %d points, up to %s, response: %s" % (len(datapoints), datetime_local, response))                datapoints = []                # write rest    if len(datapoints) > 0:        print('Read %d lines'%count)        print('Inserting %d datapoints...'%(len(datapoints)))                #if showdata:        #    print(json.dumps(datapoints, indent=3))        if not dryrun:            if rpname:                response = client.write_points(datapoints, retention_policy=rpname)            else:                response = client.write_points(datapoints)                        if response == False:                print('Problem inserting points, exiting...')                exit(1)            print("Wrote %d, response: %s" % (len(datapoints), response))    print('Done')    if dryrun:        print('(actually did not change anything on the database, as --dryrun parameter was given.)')    if __name__ == "__main__":    parser = argparse.ArgumentParser(description='Csv to influxdb.')    parser.add_argument('-i', '--input', nargs='?', required=True,                        help='Input csv file.')    parser.add_argument('-d', '--delimiter', nargs='?', required=False, default=',',                        help='Csv delimiter. Default: \',\'.')    parser.add_argument('-s', '--server', nargs='?', default='localhost:8086',                        help='Server address. Default: localhost:8086')    parser.add_argument('--ssl', action='store_true', default=False,                        help='Use HTTPS instead of HTTP.')    parser.add_argument('-u', '--user', nargs='?', default='root',                        help='User name.')    parser.add_argument('-p', '--password', nargs='?', default='root',                        help='Password.')    parser.add_argument('--dbname', nargs='?', required=True,                        help='Database name. Specify target Retention Policy: [DBNAME].[RPNAME]')        parser.add_argument('--create', action='store_true', default=False,                        help='Drop database and create a new one.')    parser.add_argument('-m', '--metricname', nargs='?', default='value',                        help='Metric column name. Default: value')    parser.add_argument('-tc', '--timecolumn', nargs='?', default='timestamp',                        help='Timestamp column name. Default: timestamp.')    parser.add_argument('-tf', '--timeformat', nargs='?', default='%Y-%m-%d %H:%M:%S',                        help='Timestamp format. Default: \'%%Y-%%m-%%d %%H:%%M:%%S\' e.g.: 1970-01-01 00:00:00')    parser.add_argument('-tz', '--timezone', default='UTC',                        help='Timezone of supplied data. Default: UTC')    parser.add_argument('--fieldcolumns', nargs='?', default='value',                        help='List of csv columns to use as fields, separated by comma, e.g.: value1,value2. Default: value')    parser.add_argument('--tagcolumns', nargs='?', default='host',                        help='List of csv columns to use as tags, separated by comma, e.g.: host,data_center. Default: host')    parser.add_argument('-g', '--gzip', action='store_true', default=False,                        help='Compress before sending to influxdb.')    parser.add_argument('-b', '--batchsize', type=int, default=5000,                        help='Batch size. Default: 5000.')        parser.add_argument('--showdata', action='store_true', default=False,                        help='Print detailed information to the console what will be done with the data (or is intended to, when using --dryrun).')        parser.add_argument('--dryrun', action='store_true', default=False,                        help='Do not change anything in the DB. Also enables --showdata.')        parser.add_argument('--datatypes', default=False,                        help='Force specify data types for fields specified in --fieldcolumns: value1=int,value2=float,value3=bool,name=str ... Valid types: int, float, str, bool')    parser.add_argument('-tp', '--tspass', action='store_true', default=False,                        help='Pass the timestamp from CSV directly to InfluxDB (do no conversion) - use only if the format is compatible to InfluxDB.')    args = parser.parse_args()    loadCsv(args.input, args.server, args.user, args.password, args.dbname,         args.metricname, args.timecolumn, args.timeformat, args.tagcolumns,         args.fieldcolumns, args.gzip, args.delimiter, args.batchsize, args.create,         args.timezone, args.ssl, args.showdata, args.dryrun, args.datatypes, args.tspass)
 |