csv-to-influxdb.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. import requests
  2. import json
  3. import gzip
  4. import argparse
  5. import csv
  6. import datetime
  7. from pytz import timezone
  8. from influxdb import InfluxDBClient
  9. epoch_naive = datetime.datetime.utcfromtimestamp(0)
  10. epoch = timezone('UTC').localize(epoch_naive)
  11. def unix_time_millis(dt):
  12. return int((dt - epoch).total_seconds() * 1000)
  13. ##
  14. ## Check if data type of field is float
  15. ##
  16. def isfloat(value):
  17. try:
  18. float(value)
  19. return True
  20. except:
  21. return False
  22. ##
  23. ## Check if data type of field is int
  24. ##
  25. def isinteger(value):
  26. try:
  27. if(float(value).is_integer()):
  28. return True
  29. else:
  30. return False
  31. except:
  32. return False
  33. def loadCsv(inputfilename, servername, user, password, dbname, metric, timecolumn, timeformat, tagcolumns, fieldcolumns, usegzip, delimiter, batchsize, datatimezone):
  34. host = servername[0:servername.rfind(':')]
  35. port = int(servername[servername.rfind(':')+1:])
  36. client = InfluxDBClient(host, port, user, password, dbname)
  37. #print('Deleting database %s'%dbname)
  38. #client.drop_database(dbname)
  39. #print('Creating database %s'%dbname)
  40. #client.create_database(dbname)
  41. client.switch_user(user, password)
  42. # format tags and fields
  43. if tagcolumns:
  44. tagcolumns = tagcolumns.split(',')
  45. if fieldcolumns:
  46. fieldcolumns = fieldcolumns.split(',')
  47. # open csv
  48. datapoints = []
  49. inputfile = open(inputfilename, 'r')
  50. count = 0
  51. with open(inputfilename, 'r') as csvfile:
  52. reader = csv.DictReader(csvfile, delimiter=delimiter)
  53. for row in reader:
  54. datetime_naive = datetime.datetime.strptime(row[timecolumn],timeformat)
  55. datetime_local = timezone(datatimezone).localize(datetime_naive)
  56. timestamp = unix_time_millis(datetime_local) * 1000000 # in nanoseconds
  57. tags = {}
  58. for t in tagcolumns:
  59. v = 0
  60. if t in row:
  61. v = row[t]
  62. tags[t] = v
  63. fields = {}
  64. for f in fieldcolumns:
  65. v = 0
  66. if f in row:
  67. v = float(row[f]) if isfloat(row[f]) else row[f]
  68. fields[f] = v
  69. point = {"measurement": metric, "time": timestamp, "fields": fields, "tags": tags}
  70. datapoints.append(point)
  71. count+=1
  72. if len(datapoints) % batchsize == 0:
  73. print('Read %d lines'%count)
  74. print('Inserting %d datapoints...'%(len(datapoints)))
  75. response = client.write_points(datapoints)
  76. if response == False:
  77. print('Problem inserting points, exiting...')
  78. exit(1)
  79. print("Wrote %d, response: %s" % (len(datapoints), response))
  80. datapoints = []
  81. # write rest
  82. if len(datapoints) > 0:
  83. print('Read %d lines'%count)
  84. print('Inserting %d datapoints...'%(len(datapoints)))
  85. response = client.write_points(datapoints)
  86. if response == False:
  87. print('Problem inserting points, exiting...')
  88. exit(1)
  89. print("Wrote %d, response: %s" % (len(datapoints), response))
  90. print('Done')
  91. if __name__ == "__main__":
  92. parser = argparse.ArgumentParser(description='Csv to influxdb.')
  93. parser.add_argument('-i', '--input', nargs='?', required=True,
  94. help='Input csv file.')
  95. parser.add_argument('-d', '--delimiter', nargs='?', required=False, default=',',
  96. help='Csv delimiter. Default: \',\'.')
  97. parser.add_argument('-s', '--server', nargs='?', default='localhost:8086',
  98. help='Server address. Default: localhost:8086')
  99. parser.add_argument('-u', '--user', nargs='?', default='root',
  100. help='User name.')
  101. parser.add_argument('-p', '--password', nargs='?', default='root',
  102. help='Password.')
  103. parser.add_argument('--dbname', nargs='?', required=True,
  104. help='Database name.')
  105. parser.add_argument('-m', '--metricname', nargs='?', default='value',
  106. help='Metric column name. Default: value')
  107. parser.add_argument('-tc', '--timecolumn', nargs='?', default='timestamp',
  108. help='Timestamp column name. Default: timestamp.')
  109. parser.add_argument('-tf', '--timeformat', nargs='?', default='%Y-%m-%d %H:%M:%S',
  110. help='Timestamp format. Default: \'%%Y-%%m-%%d %%H:%%M:%%S\' e.g.: 1970-01-01 00:00:00')
  111. parser.add_argument('-tz', '--timezone', default='UTC',
  112. help='Timezone of supplied data. Default: UTC')
  113. parser.add_argument('--fieldcolumns', nargs='?', default='value',
  114. help='List of csv columns to use as fields, separated by comma, e.g.: value1,value2. Default: value')
  115. parser.add_argument('--tagcolumns', nargs='?', default='host',
  116. help='List of csv columns to use as tags, separated by comma, e.g.: host,data_center. Default: host')
  117. parser.add_argument('-g', '--gzip', action='store_true', default=False,
  118. help='Compress before sending to influxdb.')
  119. parser.add_argument('-b', '--batchsize', type=int, default=5000,
  120. help='Batch size. Default: 5000.')
  121. args = parser.parse_args()
  122. loadCsv(args.input, args.server, args.user, args.password, args.dbname,
  123. args.metricname, args.timecolumn, args.timeformat, args.tagcolumns,
  124. args.fieldcolumns, args.gzip, args.delimiter, args.batchsize, args.timezone)