csv-to-influxdb.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. import requests
  2. import json
  3. import gzip
  4. import argparse
  5. import csv
  6. import datetime
  7. from pytz import timezone
  8. from influxdb import InfluxDBClient
  9. epoch_naive = datetime.datetime.utcfromtimestamp(0)
  10. epoch = timezone('UTC').localize(epoch_naive)
  11. def unix_time_millis(dt):
  12. return int((dt - epoch).total_seconds() * 1000)
  13. ##
  14. ## Check if data type of field is float
  15. ##
  16. def isfloat(value):
  17. try:
  18. float(value)
  19. return True
  20. except:
  21. return False
  22. def isbool(value):
  23. try:
  24. return value.lower() in ('true', 'false')
  25. except:
  26. return False
  27. def str2bool(value):
  28. return value.lower() == 'true'
  29. ##
  30. ## Check if data type of field is int
  31. ##
  32. def isinteger(value):
  33. try:
  34. if(float(value).is_integer()):
  35. return True
  36. else:
  37. return False
  38. except:
  39. return False
  40. def loadCsv(inputfilename, servername, user, password, dbname, metric,
  41. timecolumn, timeformat, tagcolumns, fieldcolumns, usegzip,
  42. delimiter, batchsize, create, datatimezone):
  43. host = servername[0:servername.rfind(':')]
  44. port = int(servername[servername.rfind(':')+1:])
  45. client = InfluxDBClient(host, port, user, password, dbname)
  46. if(create == True):
  47. print('Deleting database %s'%dbname)
  48. client.drop_database(dbname)
  49. print('Creating database %s'%dbname)
  50. client.create_database(dbname)
  51. client.switch_user(user, password)
  52. # format tags and fields
  53. if tagcolumns:
  54. tagcolumns = tagcolumns.split(',')
  55. if fieldcolumns:
  56. fieldcolumns = fieldcolumns.split(',')
  57. # open csv
  58. datapoints = []
  59. inputfile = open(inputfilename, 'r')
  60. count = 0
  61. with open(inputfilename, 'r') as csvfile:
  62. reader = csv.DictReader(csvfile, delimiter=delimiter)
  63. for row in reader:
  64. datetime_naive = datetime.datetime.strptime(row[timecolumn],timeformat)
  65. datetime_local = timezone(datatimezone).localize(datetime_naive)
  66. timestamp = unix_time_millis(datetime_local) * 1000000 # in nanoseconds
  67. tags = {}
  68. for t in tagcolumns:
  69. v = 0
  70. if t in row:
  71. v = row[t]
  72. tags[t] = v
  73. fields = {}
  74. for f in fieldcolumns:
  75. v = 0
  76. if f in row:
  77. if (isfloat(row[f])):
  78. v = float(row[f])
  79. elif (isbool(row[f])):
  80. v = str2bool(row[f])
  81. else:
  82. v = row[f]
  83. fields[f] = v
  84. point = {"measurement": metric, "time": timestamp, "fields": fields, "tags": tags}
  85. datapoints.append(point)
  86. count+=1
  87. if len(datapoints) % batchsize == 0:
  88. print('Read %d lines'%count)
  89. print('Inserting %d datapoints...'%(len(datapoints)))
  90. response = client.write_points(datapoints)
  91. if response == False:
  92. print('Problem inserting points, exiting...')
  93. exit(1)
  94. print("Wrote %d, response: %s" % (len(datapoints), response))
  95. datapoints = []
  96. # write rest
  97. if len(datapoints) > 0:
  98. print('Read %d lines'%count)
  99. print('Inserting %d datapoints...'%(len(datapoints)))
  100. response = client.write_points(datapoints)
  101. if response == False:
  102. print('Problem inserting points, exiting...')
  103. exit(1)
  104. print("Wrote %d, response: %s" % (len(datapoints), response))
  105. print('Done')
  106. if __name__ == "__main__":
  107. parser = argparse.ArgumentParser(description='Csv to influxdb.')
  108. parser.add_argument('-i', '--input', nargs='?', required=True,
  109. help='Input csv file.')
  110. parser.add_argument('-d', '--delimiter', nargs='?', required=False, default=',',
  111. help='Csv delimiter. Default: \',\'.')
  112. parser.add_argument('-s', '--server', nargs='?', default='localhost:8086',
  113. help='Server address. Default: localhost:8086')
  114. parser.add_argument('-u', '--user', nargs='?', default='root',
  115. help='User name.')
  116. parser.add_argument('-p', '--password', nargs='?', default='root',
  117. help='Password.')
  118. parser.add_argument('--dbname', nargs='?', required=True,
  119. help='Database name.')
  120. parser.add_argument('--create', action='store_true', default=False,
  121. help='Drop database and create a new one.')
  122. parser.add_argument('-m', '--metricname', nargs='?', default='value',
  123. help='Metric column name. Default: value')
  124. parser.add_argument('-tc', '--timecolumn', nargs='?', default='timestamp',
  125. help='Timestamp column name. Default: timestamp.')
  126. parser.add_argument('-tf', '--timeformat', nargs='?', default='%Y-%m-%d %H:%M:%S',
  127. help='Timestamp format. Default: \'%%Y-%%m-%%d %%H:%%M:%%S\' e.g.: 1970-01-01 00:00:00')
  128. parser.add_argument('-tz', '--timezone', default='UTC',
  129. help='Timezone of supplied data. Default: UTC')
  130. parser.add_argument('--fieldcolumns', nargs='?', default='value',
  131. help='List of csv columns to use as fields, separated by comma, e.g.: value1,value2. Default: value')
  132. parser.add_argument('--tagcolumns', nargs='?', default='host',
  133. help='List of csv columns to use as tags, separated by comma, e.g.: host,data_center. Default: host')
  134. parser.add_argument('-g', '--gzip', action='store_true', default=False,
  135. help='Compress before sending to influxdb.')
  136. parser.add_argument('-b', '--batchsize', type=int, default=5000,
  137. help='Batch size. Default: 5000.')
  138. args = parser.parse_args()
  139. loadCsv(args.input, args.server, args.user, args.password, args.dbname,
  140. args.metricname, args.timecolumn, args.timeformat, args.tagcolumns,
  141. args.fieldcolumns, args.gzip, args.delimiter, args.batchsize, args.create,
  142. args.timezone)