csv-to-influxdb.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. import requests
  2. import json
  3. import gzip
  4. import argparse
  5. import csv
  6. import datetime
  7. from influxdb import InfluxDBClient
  8. epoch = datetime.datetime.utcfromtimestamp(0)
  9. def unix_time_millis(dt):
  10. return int((dt - epoch).total_seconds() * 1000)
  11. ##
  12. ## Check if data type of field is float
  13. ##
  14. def isfloat(value):
  15. try:
  16. float(value)
  17. return True
  18. except:
  19. return False
  20. ##
  21. ## Check if data type of field is int
  22. ##
  23. def isinteger(value):
  24. try:
  25. if(float(value).is_integer()):
  26. return True
  27. else:
  28. return False
  29. except:
  30. return False
  31. def loadCsv(inputfilename, servername, user, password, dbname, metric,
  32. timecolumn, timeformat, tagcolumns, fieldcolumns, usegzip,
  33. delimiter, batchsize, create):
  34. host = servername[0:servername.rfind(':')]
  35. port = int(servername[servername.rfind(':')+1:])
  36. client = InfluxDBClient(host, port, user, password, dbname)
  37. if(create == True):
  38. print('Deleting database %s'%dbname)
  39. client.drop_database(dbname)
  40. print('Creating database %s'%dbname)
  41. client.create_database(dbname)
  42. client.switch_user(user, password)
  43. # format tags and fields
  44. if tagcolumns:
  45. tagcolumns = tagcolumns.split(',')
  46. if fieldcolumns:
  47. fieldcolumns = fieldcolumns.split(',')
  48. # open csv
  49. datapoints = []
  50. inputfile = open(inputfilename, 'r')
  51. count = 0
  52. with open(inputfilename, 'r') as csvfile:
  53. reader = csv.DictReader(csvfile, delimiter=delimiter)
  54. for row in reader:
  55. timestamp = unix_time_millis(datetime.datetime.strptime(row[timecolumn],timeformat)) * 1000000 # in nanoseconds
  56. tags = {}
  57. for t in tagcolumns:
  58. v = 0
  59. if t in row:
  60. v = row[t]
  61. tags[t] = v
  62. fields = {}
  63. for f in fieldcolumns:
  64. v = 0
  65. if f in row:
  66. v = float(row[f]) if isfloat(row[f]) else row[f]
  67. fields[f] = v
  68. point = {"measurement": metric, "time": timestamp, "fields": fields, "tags": tags}
  69. datapoints.append(point)
  70. count+=1
  71. if len(datapoints) % batchsize == 0:
  72. print('Read %d lines'%count)
  73. print('Inserting %d datapoints...'%(len(datapoints)))
  74. response = client.write_points(datapoints)
  75. if response == False:
  76. print('Problem inserting points, exiting...')
  77. exit(1)
  78. print("Wrote %d, response: %s" % (len(datapoints), response))
  79. datapoints = []
  80. # write rest
  81. if len(datapoints) > 0:
  82. print('Read %d lines'%count)
  83. print('Inserting %d datapoints...'%(len(datapoints)))
  84. response = client.write_points(datapoints)
  85. if response == False:
  86. print('Problem inserting points, exiting...')
  87. exit(1)
  88. print("Wrote %d, response: %s" % (len(datapoints), response))
  89. print('Done')
  90. if __name__ == "__main__":
  91. parser = argparse.ArgumentParser(description='Csv to influxdb.')
  92. parser.add_argument('-i', '--input', nargs='?', required=True,
  93. help='Input csv file.')
  94. parser.add_argument('-d', '--delimiter', nargs='?', required=False, default=',',
  95. help='Csv delimiter. Default: \',\'.')
  96. parser.add_argument('-s', '--server', nargs='?', default='localhost:8086',
  97. help='Server address. Default: localhost:8086')
  98. parser.add_argument('-u', '--user', nargs='?', default='root',
  99. help='User name.')
  100. parser.add_argument('-p', '--password', nargs='?', default='root',
  101. help='Password.')
  102. parser.add_argument('--dbname', nargs='?', required=True,
  103. help='Database name.')
  104. parser.add_argument('--create', action='store_true', default=False,
  105. help='Drop database and create a new one.')
  106. parser.add_argument('-m', '--metricname', nargs='?', default='value',
  107. help='Metric column name. Default: value')
  108. parser.add_argument('-tc', '--timecolumn', nargs='?', default='timestamp',
  109. help='Timestamp column name. Default: timestamp.')
  110. parser.add_argument('-tf', '--timeformat', nargs='?', default='%Y-%m-%d %H:%M:%S',
  111. help='Timestamp format. Default: \'%%Y-%%m-%%d %%H:%%M:%%S\' e.g.: 1970-01-01 00:00:00')
  112. parser.add_argument('--fieldcolumns', nargs='?', default='value',
  113. help='List of csv columns to use as fields, separated by comma, e.g.: value1,value2. Default: value')
  114. parser.add_argument('--tagcolumns', nargs='?', default='host',
  115. help='List of csv columns to use as tags, separated by comma, e.g.: host,data_center. Default: host')
  116. parser.add_argument('-g', '--gzip', action='store_true', default=False,
  117. help='Compress before sending to influxdb.')
  118. parser.add_argument('-b', '--batchsize', type=int, default=5000,
  119. help='Batch size. Default: 5000.')
  120. args = parser.parse_args()
  121. loadCsv(args.input, args.server, args.user, args.password, args.dbname,
  122. args.metricname, args.timecolumn, args.timeformat, args.tagcolumns,
  123. args.fieldcolumns, args.gzip, args.delimiter, args.batchsize, args.create)