|
@@ -11,15 +11,38 @@ epoch = datetime.datetime.utcfromtimestamp(0)
|
|
|
def unix_time_millis(dt):
|
|
|
return int((dt - epoch).total_seconds() * 1000)
|
|
|
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+def isfloat(value):
|
|
|
+ try:
|
|
|
+ float(value)
|
|
|
+ return True
|
|
|
+ except:
|
|
|
+ return False
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+def isinteger(value):
|
|
|
+ try:
|
|
|
+ if(float(value).is_integer()):
|
|
|
+ return True
|
|
|
+ else:
|
|
|
+ return False
|
|
|
+ except:
|
|
|
+ return False
|
|
|
+
|
|
|
+
|
|
|
def loadCsv(inputfilename, servername, user, password, dbname, metric, timecolumn, timeformat, tagcolumns, fieldcolumns, usegzip, delimiter, batchsize):
|
|
|
|
|
|
host = servername[0:servername.rfind(':')]
|
|
|
port = int(servername[servername.rfind(':')+1:])
|
|
|
client = InfluxDBClient(host, port, user, password, dbname)
|
|
|
|
|
|
- print 'Deleting database %s'%dbname
|
|
|
+ print('Deleting database %s'%dbname)
|
|
|
client.drop_database(dbname)
|
|
|
- print 'Creating database %s'%dbname
|
|
|
+ print('Creating database %s'%dbname)
|
|
|
client.create_database(dbname)
|
|
|
client.switch_user(user, password)
|
|
|
|
|
@@ -49,7 +72,7 @@ def loadCsv(inputfilename, servername, user, password, dbname, metric, timecolum
|
|
|
for f in fieldcolumns:
|
|
|
v = 0
|
|
|
if f in row:
|
|
|
- v = float(row[f])
|
|
|
+ v = float(row[f]) if isfloat(row[f]) else row[f]
|
|
|
fields[f] = v
|
|
|
|
|
|
|
|
@@ -59,15 +82,15 @@ def loadCsv(inputfilename, servername, user, password, dbname, metric, timecolum
|
|
|
count+=1
|
|
|
|
|
|
if len(datapoints) % batchsize == 0:
|
|
|
- print 'Read %d lines'%count
|
|
|
- print 'Inserting %d datapoints...'%(len(datapoints))
|
|
|
+ print('Read %d lines'%count)
|
|
|
+ print('Inserting %d datapoints...'%(len(datapoints)))
|
|
|
response = client.write_points(datapoints)
|
|
|
|
|
|
if response == False:
|
|
|
- print 'Problem inserting points, exiting...'
|
|
|
+ print('Problem inserting points, exiting...')
|
|
|
exit(1)
|
|
|
|
|
|
- print "Wrote %d, response: %s" % (len(datapoints), response)
|
|
|
+ print("Wrote %d, response: %s" % (len(datapoints), response))
|
|
|
|
|
|
|
|
|
datapoints = []
|
|
@@ -75,17 +98,17 @@ def loadCsv(inputfilename, servername, user, password, dbname, metric, timecolum
|
|
|
|
|
|
|
|
|
if len(datapoints) > 0:
|
|
|
- print 'Read %d lines'%count
|
|
|
- print 'Inserting %d datapoints...'%(len(datapoints))
|
|
|
+ print('Read %d lines'%count)
|
|
|
+ print('Inserting %d datapoints...'%(len(datapoints)))
|
|
|
response = client.write_points(datapoints)
|
|
|
|
|
|
if response == False:
|
|
|
- print 'Problem inserting points, exiting...'
|
|
|
+ print('Problem inserting points, exiting...')
|
|
|
exit(1)
|
|
|
|
|
|
- print "Wrote %d, response: %s" % (len(datapoints), response)
|
|
|
+ print("Wrote %d, response: %s" % (len(datapoints), response))
|
|
|
|
|
|
- print 'Done'
|
|
|
+ print('Done')
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
parser = argparse.ArgumentParser(description='Csv to influxdb.')
|