|
@@ -3,6 +3,7 @@ import gzip
|
|
|
import argparse
|
|
|
import csv
|
|
|
import datetime
|
|
|
+import json
|
|
|
from pytz import timezone
|
|
|
|
|
|
from influxdb import InfluxDBClient
|
|
@@ -38,7 +39,12 @@ def str2bool(value):
|
|
|
def isinteger(value):
|
|
|
try:
|
|
|
if(float(value).is_integer()):
|
|
|
- return True
|
|
|
+ # changed, don't only "guess" if it is int with .is_integer() as this also returns true for a float number with 0 decimal (ie 20.0)
|
|
|
+ # instead we also check the actual data and do not return it is int if contains a dot
|
|
|
+ if value.find('.') == -1:
|
|
|
+ return True
|
|
|
+ else:
|
|
|
+ return False
|
|
|
else:
|
|
|
return False
|
|
|
except:
|
|
@@ -47,13 +53,26 @@ def isinteger(value):
|
|
|
|
|
|
def loadCsv(inputfilename, servername, user, password, dbname, metric,
|
|
|
timecolumn, timeformat, tagcolumns, fieldcolumns, usegzip,
|
|
|
- delimiter, batchsize, create, datatimezone, usessl):
|
|
|
+ delimiter, batchsize, create, datatimezone, usessl, showdata, dryrun, datatypes, tspass):
|
|
|
|
|
|
host = servername[0:servername.rfind(':')]
|
|
|
port = int(servername[servername.rfind(':')+1:])
|
|
|
+
|
|
|
+ if dryrun:
|
|
|
+ showdata = True
|
|
|
+
|
|
|
+ rpname = False
|
|
|
+ if dbname.find('.') != -1:
|
|
|
+ print("dbname contains a retention policy.")
|
|
|
+ tmpdbname = dbname.split('.')
|
|
|
+ dbname = tmpdbname[0]
|
|
|
+ rpname = tmpdbname[1]
|
|
|
+ print("dbname: " + dbname)
|
|
|
+ print("rpname: " + rpname)
|
|
|
+
|
|
|
client = InfluxDBClient(host, port, user, password, dbname, ssl=usessl)
|
|
|
|
|
|
- if(create == True):
|
|
|
+ if(create == True and dryrun == False):
|
|
|
print('Deleting database %s'%dbname)
|
|
|
client.drop_database(dbname)
|
|
|
print('Creating database %s'%dbname)
|
|
@@ -66,58 +85,131 @@ def loadCsv(inputfilename, servername, user, password, dbname, metric,
|
|
|
tagcolumns = tagcolumns.split(',')
|
|
|
if fieldcolumns:
|
|
|
fieldcolumns = fieldcolumns.split(',')
|
|
|
-
|
|
|
+
|
|
|
+ print()
|
|
|
+
|
|
|
+ fields_datatypes = dict()
|
|
|
+
|
|
|
+ if datatypes:
|
|
|
+ tmpdatatypes = datatypes.split(',')
|
|
|
+ print("specified data types:")
|
|
|
+ for tmpdatatype in tmpdatatypes:
|
|
|
+ dt = tmpdatatype.split('=')
|
|
|
+ fields_datatypes[dt[0]] = dt[1]
|
|
|
+ print("column '" + dt[0] + "' => " + dt[1])
|
|
|
+ else:
|
|
|
+ print("guessing data types from data in CSV row 2...")
|
|
|
+
|
|
|
# open csv
|
|
|
datapoints = []
|
|
|
inputfile = open(inputfilename, 'r')
|
|
|
count = 0
|
|
|
+
|
|
|
with inputfile as csvfile:
|
|
|
reader = csv.DictReader(csvfile, delimiter=delimiter)
|
|
|
+
|
|
|
for row in reader:
|
|
|
- datetime_naive = datetime.datetime.strptime(row[timecolumn],timeformat)
|
|
|
-
|
|
|
- if datetime_naive.tzinfo is None:
|
|
|
- datetime_local = timezone(datatimezone).localize(datetime_naive)
|
|
|
+
|
|
|
+ if showdata:
|
|
|
+ print("Input: ", row)
|
|
|
+
|
|
|
+ if not tspass:
|
|
|
+ datetime_naive = datetime.datetime.strptime(row[timecolumn],timeformat)
|
|
|
+
|
|
|
+ if datetime_naive.tzinfo is None:
|
|
|
+ datetime_local = timezone(datatimezone).localize(datetime_naive)
|
|
|
+ else:
|
|
|
+ datetime_local = datetime_naive
|
|
|
+
|
|
|
+ timestamp = unix_time_millis(datetime_local) * 1000000 # in nanoseconds
|
|
|
else:
|
|
|
- datetime_local = datetime_naive
|
|
|
-
|
|
|
- timestamp = unix_time_millis(datetime_local) * 1000000 # in nanoseconds
|
|
|
-
|
|
|
+ timestamp = row[timecolumn]
|
|
|
+
|
|
|
tags = {}
|
|
|
for t in tagcolumns:
|
|
|
v = 0
|
|
|
if t in row:
|
|
|
v = row[t]
|
|
|
tags[t] = v
|
|
|
-
|
|
|
+
|
|
|
fields = {}
|
|
|
for f in fieldcolumns:
|
|
|
v = 0
|
|
|
if f in row:
|
|
|
- if (isfloat(row[f])):
|
|
|
- v = float(row[f])
|
|
|
- elif (isbool(row[f])):
|
|
|
- v = str2bool(row[f])
|
|
|
+ skipfield = False
|
|
|
+ if count == 0 and not datatypes:
|
|
|
+ # first row, guess data types ONLY from there and remember them for the following rows
|
|
|
+ if (isinteger(row[f])):
|
|
|
+ print("column '" + f + "' = '" + str(row[f]) + "' => int")
|
|
|
+ fields_datatypes[f] = "int"
|
|
|
+ v = int(float(row[f]))
|
|
|
+ elif (isfloat(row[f])):
|
|
|
+ print("column '" + f + "' = '" + str(row[f]) + "' => float")
|
|
|
+ fields_datatypes[f] = "float"
|
|
|
+ v = float(row[f])
|
|
|
+ elif (isbool(row[f])):
|
|
|
+ print("column '" + f + "' = '" + str(row[f]) + "' => bool")
|
|
|
+ fields_datatypes[f] = "bool"
|
|
|
+ v = str2bool(row[f])
|
|
|
+ else:
|
|
|
+ print("column '" + f + "' = '" + str(row[f]) + "' => str")
|
|
|
+ fields_datatypes[f] = "str"
|
|
|
+ v = row[f]
|
|
|
else:
|
|
|
- v = row[f]
|
|
|
- fields[f] = v
|
|
|
-
|
|
|
-
|
|
|
- point = {"measurement": metric, "time": timestamp, "fields": fields, "tags": tags}
|
|
|
-
|
|
|
- datapoints.append(point)
|
|
|
- count+=1
|
|
|
+ # from 2nd row only use data types guessed from row 1.
|
|
|
+ # check if datatype for each column fits and skip value if not (useful if there are a few missing values in the CSV)
|
|
|
+ if (fields_datatypes[f] == "int"):
|
|
|
+ if (isinteger(row[f])):
|
|
|
+ v = int(float(row[f]))
|
|
|
+ else:
|
|
|
+ skipfield = True
|
|
|
+ print("CSV row " + str(count+2) + ": skipped field '" + f + "' as it has a different data type.")
|
|
|
+ elif (fields_datatypes[f] == "float"):
|
|
|
+ if (isfloat(row[f])):
|
|
|
+ v = float(row[f])
|
|
|
+ else:
|
|
|
+ skipfield = True
|
|
|
+ print("CSV row " + str(count+2) + ": skipped field '" + f + "' as it has a different data type.")
|
|
|
+ elif (fields_datatypes[f] == "bool"):
|
|
|
+ if (isbool(row[f])):
|
|
|
+ v = str2bool(row[f])
|
|
|
+ else:
|
|
|
+ skipfield = True
|
|
|
+ print("CSV row " + str(count+2) + ": skipped field '", f, "' as it has a different data type.")
|
|
|
+ elif (fields_datatypes[f] == "str"):
|
|
|
+ v = row[f]
|
|
|
+
|
|
|
+ if not skipfield:
|
|
|
+ fields[f] = v
|
|
|
+
|
|
|
+ if len(fields) > 0:
|
|
|
+ point = {"measurement": metric, "time": timestamp, "fields": fields, "tags": tags}
|
|
|
+ if showdata:
|
|
|
+ print("Output: ", json.dumps(point, indent=3))
|
|
|
+
|
|
|
+ datapoints.append(point)
|
|
|
+ count+=1
|
|
|
+ else:
|
|
|
+ print("CSV row " + str(count+2) + ": skipped as it contains no field values.")
|
|
|
+ count+=1
|
|
|
|
|
|
if len(datapoints) % batchsize == 0:
|
|
|
print('Read %d lines'%count)
|
|
|
print('Inserting %d datapoints...'%(len(datapoints)))
|
|
|
- response = client.write_points(datapoints)
|
|
|
-
|
|
|
- if not response:
|
|
|
- print('Problem inserting points, exiting...')
|
|
|
- exit(1)
|
|
|
-
|
|
|
- print("Wrote %d points, up to %s, response: %s" % (len(datapoints), datetime_local, response))
|
|
|
+
|
|
|
+ #if showdata:
|
|
|
+ # print(json.dumps(datapoints, indent=3))
|
|
|
+ if not dryrun:
|
|
|
+ if rpname:
|
|
|
+ response = client.write_points(datapoints, retention_policy=rpname)
|
|
|
+ else:
|
|
|
+ response = client.write_points(datapoints)
|
|
|
+
|
|
|
+ if not response:
|
|
|
+ print('Problem inserting points, exiting...')
|
|
|
+ exit(1)
|
|
|
+
|
|
|
+ print("Wrote %d points, up to %s, response: %s" % (len(datapoints), datetime_local, response))
|
|
|
|
|
|
datapoints = []
|
|
|
|
|
@@ -126,15 +218,24 @@ def loadCsv(inputfilename, servername, user, password, dbname, metric,
|
|
|
if len(datapoints) > 0:
|
|
|
print('Read %d lines'%count)
|
|
|
print('Inserting %d datapoints...'%(len(datapoints)))
|
|
|
- response = client.write_points(datapoints)
|
|
|
-
|
|
|
- if response == False:
|
|
|
- print('Problem inserting points, exiting...')
|
|
|
- exit(1)
|
|
|
+
|
|
|
+ #if showdata:
|
|
|
+ # print(json.dumps(datapoints, indent=3))
|
|
|
+ if not dryrun:
|
|
|
+ if rpname:
|
|
|
+ response = client.write_points(datapoints, retention_policy=rpname)
|
|
|
+ else:
|
|
|
+ response = client.write_points(datapoints)
|
|
|
+
|
|
|
+ if response == False:
|
|
|
+ print('Problem inserting points, exiting...')
|
|
|
+ exit(1)
|
|
|
+ print("Wrote %d, response: %s" % (len(datapoints), response))
|
|
|
|
|
|
- print("Wrote %d, response: %s" % (len(datapoints), response))
|
|
|
|
|
|
print('Done')
|
|
|
+ if dryrun:
|
|
|
+ print('(actually did not change anything on the database, as --dryrun parameter was given.)')
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
parser = argparse.ArgumentParser(description='Csv to influxdb.')
|
|
@@ -158,8 +259,8 @@ if __name__ == "__main__":
|
|
|
help='Password.')
|
|
|
|
|
|
parser.add_argument('--dbname', nargs='?', required=True,
|
|
|
- help='Database name.')
|
|
|
-
|
|
|
+ help='Database name. Specify target Retention Policy: [DBNAME].[RPNAME]')
|
|
|
+
|
|
|
parser.add_argument('--create', action='store_true', default=False,
|
|
|
help='Drop database and create a new one.')
|
|
|
|
|
@@ -186,9 +287,21 @@ if __name__ == "__main__":
|
|
|
|
|
|
parser.add_argument('-b', '--batchsize', type=int, default=5000,
|
|
|
help='Batch size. Default: 5000.')
|
|
|
+
|
|
|
+ parser.add_argument('--showdata', action='store_true', default=False,
|
|
|
+ help='Print detailed information to the console what will be done with the data (or is intended to, when using --dryrun).')
|
|
|
+
|
|
|
+ parser.add_argument('--dryrun', action='store_true', default=False,
|
|
|
+ help='Do not change anything in the DB. Also enables --showdata.')
|
|
|
+
|
|
|
+ parser.add_argument('--datatypes', default=False,
|
|
|
+ help='Force specify data types for fields specified in --fieldcolumns: value1=int,value2=float,value3=bool,name=str ... Valid types: int, float, str, bool')
|
|
|
+
|
|
|
+ parser.add_argument('-tp', '--tspass', action='store_true', default=False,
|
|
|
+ help='Pass the timestamp from CSV directly to InfluxDB (do no conversion) - use only if the format is compatible to InfluxDB.')
|
|
|
|
|
|
args = parser.parse_args()
|
|
|
loadCsv(args.input, args.server, args.user, args.password, args.dbname,
|
|
|
args.metricname, args.timecolumn, args.timeformat, args.tagcolumns,
|
|
|
args.fieldcolumns, args.gzip, args.delimiter, args.batchsize, args.create,
|
|
|
- args.timezone, args.ssl)
|
|
|
+ args.timezone, args.ssl, args.showdata, args.dryrun, args.datatypes, args.tspass)
|