csv-to-influxdb.py 12 KB

  1. import requests
  2. import gzip
  3. import argparse
  4. import csv
  5. import datetime
  6. import json
  7. from pytz import timezone
  8. from influxdb import InfluxDBClient
  9. epoch_naive = datetime.datetime.utcfromtimestamp(0)
  10. epoch = timezone('UTC').localize(epoch_naive)
  11. def unix_time_millis(dt):
  12. return int((dt - epoch).total_seconds() * 1000)
  13. ##
  14. ## Check if data type of field is float
  15. ##
  16. def isfloat(value):
  17. try:
  18. float(value)
  19. return True
  20. except:
  21. return False
  22. def isbool(value):
  23. try:
  24. return value.lower() in ('true', 'false')
  25. except:
  26. return False
  27. def str2bool(value):
  28. return value.lower() == 'true'
  29. ##
  30. ## Check if data type of field is int
  31. ##
  32. def isinteger(value):
  33. try:
  34. if(float(value).is_integer()):
  35. # changed, don't only "guess" if it is int with .is_integer() as this also returns true for a float number with 0 decimal (ie 20.0)
  36. # instead we also check the actual data and do not return it is int if contains a dot
  37. if value.find('.') == -1:
  38. return True
  39. else:
  40. return False
  41. else:
  42. return False
  43. except:
  44. return False
  45. def loadCsv(inputfilename, servername, user, password, dbname, metric,
  46. timecolumn, timeformat, tagcolumns, fieldcolumns, usegzip,
  47. delimiter, batchsize, create, datatimezone, usessl, showdata, dryrun, datatypes, tspass):
  48. host = servername[0:servername.rfind(':')]
  49. port = int(servername[servername.rfind(':')+1:])
  50. if dryrun:
  51. showdata = True
  52. rpname = False
  53. if dbname.find('.') != -1:
  54. print("dbname contains a retention policy.")
  55. tmpdbname = dbname.split('.')
  56. dbname = tmpdbname[0]
  57. rpname = tmpdbname[1]
  58. print("dbname: " + dbname)
  59. print("rpname: " + rpname)
  60. client = InfluxDBClient(host, port, user, password, dbname, ssl=usessl)
  61. if(create == True and dryrun == False):
  62. print('Deleting database %s'%dbname)
  63. client.drop_database(dbname)
  64. print('Creating database %s'%dbname)
  65. client.create_database(dbname)
  66. client.switch_user(user, password)
  67. # format tags and fields
  68. if tagcolumns:
  69. tagcolumns = tagcolumns.split(',')
  70. if fieldcolumns:
  71. fieldcolumns = fieldcolumns.split(',')
  72. print()
  73. fields_datatypes = dict()
  74. if datatypes:
  75. tmpdatatypes = datatypes.split(',')
  76. print("specified data types:")
  77. for tmpdatatype in tmpdatatypes:
  78. dt = tmpdatatype.split('=')
  79. fields_datatypes[dt[0]] = dt[1]
  80. print("column '" + dt[0] + "' => " + dt[1])
  81. else:
  82. print("guessing data types from data in CSV row 2...")
  83. # open csv
  84. datapoints = []
  85. inputfile = open(inputfilename, 'r')
  86. count = 0
  87. with inputfile as csvfile:
  88. reader = csv.DictReader(csvfile, delimiter=delimiter)
  89. for row in reader:
  90. if showdata:
  91. print("Input: ", row)
  92. if not tspass:
  93. datetime_naive = datetime.datetime.strptime(row[timecolumn],timeformat)
  94. if datetime_naive.tzinfo is None:
  95. datetime_local = timezone(datatimezone).localize(datetime_naive)
  96. else:
  97. datetime_local = datetime_naive
  98. timestamp = unix_time_millis(datetime_local) * 1000000 # in nanoseconds
  99. else:
  100. timestamp = row[timecolumn]
  101. tags = {}
  102. for t in tagcolumns:
  103. v = 0
  104. if t in row:
  105. v = row[t]
  106. tags[t] = v
  107. fields = {}
  108. for f in fieldcolumns:
  109. v = 0
  110. if f in row:
  111. skipfield = False
  112. if count == 0 and not datatypes:
  113. # first row, guess data types ONLY from there and remember them for the following rows
  114. if (isinteger(row[f])):
  115. print("column '" + f + "' = '" + str(row[f]) + "' => int")
  116. fields_datatypes[f] = "int"
  117. v = int(float(row[f]))
  118. elif (isfloat(row[f])):
  119. print("column '" + f + "' = '" + str(row[f]) + "' => float")
  120. fields_datatypes[f] = "float"
  121. v = float(row[f])
  122. elif (isbool(row[f])):
  123. print("column '" + f + "' = '" + str(row[f]) + "' => bool")
  124. fields_datatypes[f] = "bool"
  125. v = str2bool(row[f])
  126. else:
  127. print("column '" + f + "' = '" + str(row[f]) + "' => str")
  128. fields_datatypes[f] = "str"
  129. v = row[f]
  130. else:
  131. # from 2nd row only use data types guessed from row 1.
  132. # check if datatype for each column fits and skip value if not (useful if there are a few missing values in the CSV)
  133. if (fields_datatypes[f] == "int"):
  134. if (isinteger(row[f])):
  135. v = int(float(row[f]))
  136. else:
  137. skipfield = True
  138. print("CSV row " + str(count+2) + ": skipped field '" + f + "' as it has a different data type.")
  139. elif (fields_datatypes[f] == "float"):
  140. if (isfloat(row[f])):
  141. v = float(row[f])
  142. else:
  143. skipfield = True
  144. print("CSV row " + str(count+2) + ": skipped field '" + f + "' as it has a different data type.")
  145. elif (fields_datatypes[f] == "bool"):
  146. if (isbool(row[f])):
  147. v = str2bool(row[f])
  148. else:
  149. skipfield = True
  150. print("CSV row " + str(count+2) + ": skipped field '", f, "' as it has a different data type.")
  151. elif (fields_datatypes[f] == "str"):
  152. v = row[f]
  153. if not skipfield:
  154. fields[f] = v
  155. if len(fields) > 0:
  156. point = {"measurement": metric, "time": timestamp, "fields": fields, "tags": tags}
  157. if showdata:
  158. print("Output: ", json.dumps(point, indent=3))
  159. datapoints.append(point)
  160. count+=1
  161. else:
  162. print("CSV row " + str(count+2) + ": skipped as it contains no field values.")
  163. count+=1
  164. if len(datapoints) % batchsize == 0:
  165. print('Read %d lines'%count)
  166. print('Inserting %d datapoints...'%(len(datapoints)))
  167. #if showdata:
  168. # print(json.dumps(datapoints, indent=3))
  169. if not dryrun:
  170. if rpname:
  171. response = client.write_points(datapoints, retention_policy=rpname)
  172. else:
  173. response = client.write_points(datapoints)
  174. if not response:
  175. print('Problem inserting points, exiting...')
  176. exit(1)
  177. print("Wrote %d points, up to %s, response: %s" % (len(datapoints), datetime_local, response))
  178. datapoints = []
  179. # write rest
  180. if len(datapoints) > 0:
  181. print('Read %d lines'%count)
  182. print('Inserting %d datapoints...'%(len(datapoints)))
  183. #if showdata:
  184. # print(json.dumps(datapoints, indent=3))
  185. if not dryrun:
  186. if rpname:
  187. response = client.write_points(datapoints, retention_policy=rpname)
  188. else:
  189. response = client.write_points(datapoints)
  190. if response == False:
  191. print('Problem inserting points, exiting...')
  192. exit(1)
  193. print("Wrote %d, response: %s" % (len(datapoints), response))
  194. print('Done')
  195. if dryrun:
  196. print('(actually did not change anything on the database, as --dryrun parameter was given.)')
  197. if __name__ == "__main__":
  198. parser = argparse.ArgumentParser(description='Csv to influxdb.')
  199. parser.add_argument('-i', '--input', nargs='?', required=True,
  200. help='Input csv file.')
  201. parser.add_argument('-d', '--delimiter', nargs='?', required=False, default=',',
  202. help='Csv delimiter. Default: \',\'.')
  203. parser.add_argument('-s', '--server', nargs='?', default='localhost:8086',
  204. help='Server address. Default: localhost:8086')
  205. parser.add_argument('--ssl', action='store_true', default=False,
  206. help='Use HTTPS instead of HTTP.')
  207. parser.add_argument('-u', '--user', nargs='?', default='root',
  208. help='User name.')
  209. parser.add_argument('-p', '--password', nargs='?', default='root',
  210. help='Password.')
  211. parser.add_argument('--dbname', nargs='?', required=True,
  212. help='Database name. Specify target Retention Policy: [DBNAME].[RPNAME]')
  213. parser.add_argument('--create', action='store_true', default=False,
  214. help='Drop database and create a new one.')
  215. parser.add_argument('-m', '--metricname', nargs='?', default='value',
  216. help='Metric column name. Default: value')
  217. parser.add_argument('-tc', '--timecolumn', nargs='?', default='timestamp',
  218. help='Timestamp column name. Default: timestamp.')
  219. parser.add_argument('-tf', '--timeformat', nargs='?', default='%Y-%m-%d %H:%M:%S',
  220. help='Timestamp format. Default: \'%%Y-%%m-%%d %%H:%%M:%%S\' e.g.: 1970-01-01 00:00:00')
  221. parser.add_argument('-tz', '--timezone', default='UTC',
  222. help='Timezone of supplied data. Default: UTC')
  223. parser.add_argument('--fieldcolumns', nargs='?', default='value',
  224. help='List of csv columns to use as fields, separated by comma, e.g.: value1,value2. Default: value')
  225. parser.add_argument('--tagcolumns', nargs='?', default='host',
  226. help='List of csv columns to use as tags, separated by comma, e.g.: host,data_center. Default: host')
  227. parser.add_argument('-g', '--gzip', action='store_true', default=False,
  228. help='Compress before sending to influxdb.')
  229. parser.add_argument('-b', '--batchsize', type=int, default=5000,
  230. help='Batch size. Default: 5000.')
  231. parser.add_argument('--showdata', action='store_true', default=False,
  232. help='Print detailed information to the console what will be done with the data (or is intended to, when using --dryrun).')
  233. parser.add_argument('--dryrun', action='store_true', default=False,
  234. help='Do not change anything in the DB. Also enables --showdata.')
  235. parser.add_argument('--datatypes', default=False,
  236. help='Force specify data types for fields specified in --fieldcolumns: value1=int,value2=float,value3=bool,name=str ... Valid types: int, float, str, bool')
  237. parser.add_argument('-tp', '--tspass', action='store_true', default=False,
  238. help='Pass the timestamp from CSV directly to InfluxDB (do no conversion) - use only if the format is compatible to InfluxDB.')
  239. args = parser.parse_args()
  240. loadCsv(args.input, args.server, args.user, args.password, args.dbname,
  241. args.metricname, args.timecolumn, args.timeformat, args.tagcolumns,
  242. args.fieldcolumns, args.gzip, args.delimiter, args.batchsize, args.create,
  243. args.timezone, args.ssl, args.showdata, args.dryrun, args.datatypes, args.tspass)