csv-to-influxdb.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307
  1. import requests
  2. import gzip
  3. import argparse
  4. import csv
  5. import datetime
  6. import json
  7. from pytz import timezone
  8. from influxdb import InfluxDBClient
  9. epoch_naive = datetime.datetime.utcfromtimestamp(0)
  10. epoch = timezone('UTC').localize(epoch_naive)
  11. def unix_time_millis(dt):
  12. return int((dt - epoch).total_seconds() * 1000)
  13. ##
  14. ## Check if data type of field is float
  15. ##
  16. def isfloat(value):
  17. try:
  18. float(value)
  19. return True
  20. except:
  21. return False
  22. def isbool(value):
  23. try:
  24. return value.lower() in ('true', 'false')
  25. except:
  26. return False
  27. def str2bool(value):
  28. return value.lower() == 'true'
  29. ##
  30. ## Check if data type of field is int
  31. ##
  32. def isinteger(value):
  33. try:
  34. if(float(value).is_integer()):
  35. # changed, don't only "guess" if it is int with .is_integer() as this also returns true for a float number with 0 decimal (ie 20.0)
  36. # instead we also check the actual data and do not return it is int if contains a dot
  37. if value.find('.') == -1:
  38. return True
  39. else:
  40. return False
  41. else:
  42. return False
  43. except:
  44. return False
  45. def loadCsv(inputfilename, servername, user, password, dbname, metric,
  46. timecolumn, timeformat, tagcolumns, fieldcolumns, usegzip,
  47. delimiter, batchsize, create, datatimezone, usessl, showdata, dryrun, datatypes, tspass):
  48. host = servername[0:servername.rfind(':')]
  49. port = int(servername[servername.rfind(':')+1:])
  50. if dryrun:
  51. showdata = True
  52. rpname = False
  53. if dbname.find('.') != -1:
  54. print("dbname contains a retention policy.")
  55. tmpdbname = dbname.split('.')
  56. dbname = tmpdbname[0]
  57. rpname = tmpdbname[1]
  58. print("dbname: " + dbname)
  59. print("rpname: " + rpname)
  60. client = InfluxDBClient(host, port, user, password, dbname, ssl=usessl)
  61. if(create == True and dryrun == False):
  62. print('Deleting database %s'%dbname)
  63. client.drop_database(dbname)
  64. print('Creating database %s'%dbname)
  65. client.create_database(dbname)
  66. client.switch_user(user, password)
  67. # format tags and fields
  68. if tagcolumns:
  69. tagcolumns = tagcolumns.split(',')
  70. if fieldcolumns:
  71. fieldcolumns = fieldcolumns.split(',')
  72. print()
  73. fields_datatypes = dict()
  74. if datatypes:
  75. tmpdatatypes = datatypes.split(',')
  76. print("specified data types:")
  77. for tmpdatatype in tmpdatatypes:
  78. dt = tmpdatatype.split('=')
  79. fields_datatypes[dt[0]] = dt[1]
  80. print("column '" + dt[0] + "' => " + dt[1])
  81. else:
  82. print("guessing data types from data in CSV row 2...")
  83. # open csv
  84. datapoints = []
  85. inputfile = open(inputfilename, 'r')
  86. count = 0
  87. with inputfile as csvfile:
  88. reader = csv.DictReader(csvfile, delimiter=delimiter)
  89. for row in reader:
  90. if showdata:
  91. print("Input: ", row)
  92. if not tspass:
  93. datetime_naive = datetime.datetime.strptime(row[timecolumn],timeformat)
  94. if datetime_naive.tzinfo is None:
  95. datetime_local = timezone(datatimezone).localize(datetime_naive)
  96. else:
  97. datetime_local = datetime_naive
  98. timestamp = unix_time_millis(datetime_local) * 1000000 # in nanoseconds
  99. else:
  100. timestamp = row[timecolumn]
  101. tags = {}
  102. for t in tagcolumns:
  103. v = 0
  104. if t in row:
  105. v = row[t]
  106. tags[t] = v
  107. fields = {}
  108. for f in fieldcolumns:
  109. v = 0
  110. if f in row:
  111. skipfield = False
  112. if count == 0 and not datatypes:
  113. # first row, guess data types ONLY from there and remember them for the following rows
  114. if (isinteger(row[f])):
  115. print("column '" + f + "' = '" + str(row[f]) + "' => int")
  116. fields_datatypes[f] = "int"
  117. v = int(float(row[f]))
  118. elif (isfloat(row[f])):
  119. print("column '" + f + "' = '" + str(row[f]) + "' => float")
  120. fields_datatypes[f] = "float"
  121. v = float(row[f])
  122. elif (isbool(row[f])):
  123. print("column '" + f + "' = '" + str(row[f]) + "' => bool")
  124. fields_datatypes[f] = "bool"
  125. v = str2bool(row[f])
  126. else:
  127. print("column '" + f + "' = '" + str(row[f]) + "' => str")
  128. fields_datatypes[f] = "str"
  129. v = row[f]
  130. else:
  131. # from 2nd row only use data types guessed from row 1.
  132. # check if datatype for each column fits and skip value if not (useful if there are a few missing values in the CSV)
  133. if (fields_datatypes[f] == "int"):
  134. if (isinteger(row[f])):
  135. v = int(float(row[f]))
  136. else:
  137. skipfield = True
  138. print("CSV row " + str(count+2) + ": skipped field '" + f + "' as it has a different data type.")
  139. elif (fields_datatypes[f] == "float"):
  140. if (isfloat(row[f])):
  141. v = float(row[f])
  142. else:
  143. skipfield = True
  144. print("CSV row " + str(count+2) + ": skipped field '" + f + "' as it has a different data type.")
  145. elif (fields_datatypes[f] == "bool"):
  146. if (isbool(row[f])):
  147. v = str2bool(row[f])
  148. else:
  149. skipfield = True
  150. print("CSV row " + str(count+2) + ": skipped field '", f, "' as it has a different data type.")
  151. elif (fields_datatypes[f] == "str"):
  152. v = row[f]
  153. if not skipfield:
  154. fields[f] = v
  155. if len(fields) > 0:
  156. point = {"measurement": metric, "time": timestamp, "fields": fields, "tags": tags}
  157. if showdata:
  158. print("Output: ", json.dumps(point, indent=3))
  159. datapoints.append(point)
  160. count+=1
  161. else:
  162. print("CSV row " + str(count+2) + ": skipped as it contains no field values.")
  163. count+=1
  164. if len(datapoints) % batchsize == 0:
  165. print('Read %d lines'%count)
  166. print('Inserting %d datapoints...'%(len(datapoints)))
  167. #if showdata:
  168. # print(json.dumps(datapoints, indent=3))
  169. if not dryrun:
  170. if rpname:
  171. response = client.write_points(datapoints, retention_policy=rpname)
  172. else:
  173. response = client.write_points(datapoints)
  174. if not response:
  175. print('Problem inserting points, exiting...')
  176. exit(1)
  177. print("Wrote %d points, up to %s, response: %s" % (len(datapoints), datetime_local, response))
  178. datapoints = []
  179. # write rest
  180. if len(datapoints) > 0:
  181. print('Read %d lines'%count)
  182. print('Inserting %d datapoints...'%(len(datapoints)))
  183. #if showdata:
  184. # print(json.dumps(datapoints, indent=3))
  185. if not dryrun:
  186. if rpname:
  187. response = client.write_points(datapoints, retention_policy=rpname)
  188. else:
  189. response = client.write_points(datapoints)
  190. if response == False:
  191. print('Problem inserting points, exiting...')
  192. exit(1)
  193. print("Wrote %d, response: %s" % (len(datapoints), response))
  194. print('Done')
  195. if dryrun:
  196. print('(actually did not change anything on the database, as --dryrun parameter was given.)')
  197. if __name__ == "__main__":
  198. parser = argparse.ArgumentParser(description='Csv to influxdb.')
  199. parser.add_argument('-i', '--input', nargs='?', required=True,
  200. help='Input csv file.')
  201. parser.add_argument('-d', '--delimiter', nargs='?', required=False, default=',',
  202. help='Csv delimiter. Default: \',\'.')
  203. parser.add_argument('-s', '--server', nargs='?', default='localhost:8086',
  204. help='Server address. Default: localhost:8086')
  205. parser.add_argument('--ssl', action='store_true', default=False,
  206. help='Use HTTPS instead of HTTP.')
  207. parser.add_argument('-u', '--user', nargs='?', default='root',
  208. help='User name.')
  209. parser.add_argument('-p', '--password', nargs='?', default='root',
  210. help='Password.')
  211. parser.add_argument('--dbname', nargs='?', required=True,
  212. help='Database name. Specify target Retention Policy: [DBNAME].[RPNAME]')
  213. parser.add_argument('--create', action='store_true', default=False,
  214. help='Drop database and create a new one.')
  215. parser.add_argument('-m', '--metricname', nargs='?', default='value',
  216. help='Metric column name. Default: value')
  217. parser.add_argument('-tc', '--timecolumn', nargs='?', default='timestamp',
  218. help='Timestamp column name. Default: timestamp.')
  219. parser.add_argument('-tf', '--timeformat', nargs='?', default='%Y-%m-%d %H:%M:%S',
  220. help='Timestamp format. Default: \'%%Y-%%m-%%d %%H:%%M:%%S\' e.g.: 1970-01-01 00:00:00')
  221. parser.add_argument('-tz', '--timezone', default='UTC',
  222. help='Timezone of supplied data. Default: UTC')
  223. parser.add_argument('--fieldcolumns', nargs='?', default='value',
  224. help='List of csv columns to use as fields, separated by comma, e.g.: value1,value2. Default: value')
  225. parser.add_argument('--tagcolumns', nargs='?', default='host',
  226. help='List of csv columns to use as tags, separated by comma, e.g.: host,data_center. Default: host')
  227. parser.add_argument('-g', '--gzip', action='store_true', default=False,
  228. help='Compress before sending to influxdb.')
  229. parser.add_argument('-b', '--batchsize', type=int, default=5000,
  230. help='Batch size. Default: 5000.')
  231. parser.add_argument('--showdata', action='store_true', default=False,
  232. help='Print detailed information to the console what will be done with the data (or is intended to, when using --dryrun).')
  233. parser.add_argument('--dryrun', action='store_true', default=False,
  234. help='Do not change anything in the DB. Also enables --showdata.')
  235. parser.add_argument('--datatypes', default=False,
  236. help='Force specify data types for fields specified in --fieldcolumns: value1=int,value2=float,value3=bool,name=str ... Valid types: int, float, str, bool')
  237. parser.add_argument('-tp', '--tspass', action='store_true', default=False,
  238. help='Pass the timestamp from CSV directly to InfluxDB (do no conversion) - use only if the format is compatible to InfluxDB.')
  239. args = parser.parse_args()
  240. loadCsv(args.input, args.server, args.user, args.password, args.dbname,
  241. args.metricname, args.timecolumn, args.timeformat, args.tagcolumns,
  242. args.fieldcolumns, args.gzip, args.delimiter, args.batchsize, args.create,
  243. args.timezone, args.ssl, args.showdata, args.dryrun, args.datatypes, args.tspass)