csv-to-influxdb.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. import requests
  2. import gzip
  3. import argparse
  4. import csv
  5. import datetime
  6. from pytz import timezone
  7. from influxdb import InfluxDBClient
  8. epoch_naive = datetime.datetime.utcfromtimestamp(0)
  9. epoch = timezone('UTC').localize(epoch_naive)
  10. def unix_time_millis(dt):
  11. return int((dt - epoch).total_seconds() * 1000)
  12. ##
  13. ## Check if data type of field is float
  14. ##
  15. def isfloat(value):
  16. try:
  17. float(value)
  18. return True
  19. except:
  20. return False
  21. def isbool(value):
  22. try:
  23. return value.lower() in ('true', 'false')
  24. except:
  25. return False
  26. def str2bool(value):
  27. return value.lower() == 'true'
  28. ##
  29. ## Check if data type of field is int
  30. ##
  31. def isinteger(value):
  32. try:
  33. if(float(value).is_integer()):
  34. return True
  35. else:
  36. return False
  37. except:
  38. return False
  39. def loadCsv(inputfilename, servername, user, password, dbname, metric,
  40. timecolumn, timeformat, tagcolumns, fieldcolumns, usegzip,
  41. delimiter, batchsize, create, datatimezone, usessl):
  42. host = servername[0:servername.rfind(':')]
  43. port = int(servername[servername.rfind(':')+1:])
  44. client = InfluxDBClient(host, port, user, password, dbname, ssl=usessl)
  45. if(create == True):
  46. print('Deleting database %s'%dbname)
  47. client.drop_database(dbname)
  48. print('Creating database %s'%dbname)
  49. client.create_database(dbname)
  50. client.switch_user(user, password)
  51. # format tags and fields
  52. if tagcolumns:
  53. tagcolumns = tagcolumns.split(',')
  54. if fieldcolumns:
  55. fieldcolumns = fieldcolumns.split(',')
  56. # open csv
  57. datapoints = []
  58. inputfile = open(inputfilename, 'r')
  59. count = 0
  60. with inputfile as csvfile:
  61. reader = csv.DictReader(csvfile, delimiter=delimiter)
  62. for row in reader:
  63. datetime_naive = datetime.datetime.strptime(row[timecolumn],timeformat)
  64. if datetime_naive.tzinfo is None:
  65. datetime_local = timezone(datatimezone).localize(datetime_naive)
  66. else:
  67. datetime_local = datetime_naive
  68. timestamp = unix_time_millis(datetime_local) * 1000000 # in nanoseconds
  69. tags = {}
  70. for t in tagcolumns:
  71. v = 0
  72. if t in row:
  73. v = row[t]
  74. tags[t] = v
  75. fields = {}
  76. for f in fieldcolumns:
  77. v = 0
  78. if f in row:
  79. if (isfloat(row[f])):
  80. v = float(row[f])
  81. elif (isbool(row[f])):
  82. v = str2bool(row[f])
  83. else:
  84. v = row[f]
  85. fields[f] = v
  86. point = {"measurement": metric, "time": timestamp, "fields": fields, "tags": tags}
  87. datapoints.append(point)
  88. count+=1
  89. if len(datapoints) % batchsize == 0:
  90. print('Read %d lines'%count)
  91. print('Inserting %d datapoints...'%(len(datapoints)))
  92. response = client.write_points(datapoints)
  93. if not response:
  94. print('Problem inserting points, exiting...')
  95. exit(1)
  96. print("Wrote %d points, up to %s, response: %s" % (len(datapoints), datetime_local, response))
  97. datapoints = []
  98. # write rest
  99. if len(datapoints) > 0:
  100. print('Read %d lines'%count)
  101. print('Inserting %d datapoints...'%(len(datapoints)))
  102. response = client.write_points(datapoints)
  103. if response == False:
  104. print('Problem inserting points, exiting...')
  105. exit(1)
  106. print("Wrote %d, response: %s" % (len(datapoints), response))
  107. print('Done')
  108. if __name__ == "__main__":
  109. parser = argparse.ArgumentParser(description='Csv to influxdb.')
  110. parser.add_argument('-i', '--input', nargs='?', required=True,
  111. help='Input csv file.')
  112. parser.add_argument('-d', '--delimiter', nargs='?', required=False, default=',',
  113. help='Csv delimiter. Default: \',\'.')
  114. parser.add_argument('-s', '--server', nargs='?', default='localhost:8086',
  115. help='Server address. Default: localhost:8086')
  116. parser.add_argument('--ssl', action='store_true', default=False,
  117. help='Use HTTPS instead of HTTP.')
  118. parser.add_argument('-u', '--user', nargs='?', default='root',
  119. help='User name.')
  120. parser.add_argument('-p', '--password', nargs='?', default='root',
  121. help='Password.')
  122. parser.add_argument('--dbname', nargs='?', required=True,
  123. help='Database name.')
  124. parser.add_argument('--create', action='store_true', default=False,
  125. help='Drop database and create a new one.')
  126. parser.add_argument('-m', '--metricname', nargs='?', default='value',
  127. help='Metric column name. Default: value')
  128. parser.add_argument('-tc', '--timecolumn', nargs='?', default='timestamp',
  129. help='Timestamp column name. Default: timestamp.')
  130. parser.add_argument('-tf', '--timeformat', nargs='?', default='%Y-%m-%d %H:%M:%S',
  131. help='Timestamp format. Default: \'%%Y-%%m-%%d %%H:%%M:%%S\' e.g.: 1970-01-01 00:00:00')
  132. parser.add_argument('-tz', '--timezone', default='UTC',
  133. help='Timezone of supplied data. Default: UTC')
  134. parser.add_argument('--fieldcolumns', nargs='?', default='value',
  135. help='List of csv columns to use as fields, separated by comma, e.g.: value1,value2. Default: value')
  136. parser.add_argument('--tagcolumns', nargs='?', default='host',
  137. help='List of csv columns to use as tags, separated by comma, e.g.: host,data_center. Default: host')
  138. parser.add_argument('-g', '--gzip', action='store_true', default=False,
  139. help='Compress before sending to influxdb.')
  140. parser.add_argument('-b', '--batchsize', type=int, default=5000,
  141. help='Batch size. Default: 5000.')
  142. args = parser.parse_args()
  143. loadCsv(args.input, args.server, args.user, args.password, args.dbname,
  144. args.metricname, args.timecolumn, args.timeformat, args.tagcolumns,
  145. args.fieldcolumns, args.gzip, args.delimiter, args.batchsize, args.create,
  146. args.timezone, args.ssl)