csv-to-influxdb.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. import requests
  2. import json
  3. import gzip
  4. import argparse
  5. import csv
  6. import datetime
  7. from influxdb import InfluxDBClient
  8. epoch = datetime.datetime.utcfromtimestamp(0)
  9. def unix_time_millis(dt):
  10. return int((dt - epoch).total_seconds() * 1000)
  11. def loadCsv(inputfilename, servername, user, password, dbname, metric, timecolumn, timeformat, tagcolumns, fieldcolumns, usegzip, delimiter, batchsize):
  12. host = servername[0:servername.rfind(':')]
  13. port = int(servername[servername.rfind(':')+1:])
  14. client = InfluxDBClient(host, port, user, password, dbname)
  15. print 'Deleting database %s'%dbname
  16. client.drop_database(dbname)
  17. print 'Creating database %s'%dbname
  18. client.create_database(dbname)
  19. client.switch_user(user, password)
  20. # format tags and fields
  21. if tagcolumns:
  22. tagcolumns = tagcolumns.split(',')
  23. if fieldcolumns:
  24. fieldcolumns = fieldcolumns.split(',')
  25. # open csv
  26. datapoints = []
  27. inputfile = open(inputfilename, 'r')
  28. count = 0
  29. with open(inputfilename, 'r') as csvfile:
  30. reader = csv.DictReader(csvfile, delimiter=delimiter)
  31. for row in reader:
  32. name = metric
  33. timestamp = unix_time_millis(datetime.datetime.strptime(row[timecolumn],timeformat)) * 1000000 # in nanoseconds
  34. value = float(row[metric])
  35. tags = {}
  36. for t in tagcolumns:
  37. v = 0
  38. if t in row:
  39. v = row[t]
  40. tags[t] = v
  41. fields = {}
  42. for f in fieldcolumns:
  43. v = 0
  44. if f in row:
  45. v = row[f]
  46. fields[f] = v
  47. point = {"measurement": metric, "time": timestamp, "fields": fields, "tags": tags}
  48. datapoints.append(point)
  49. if count % batchsize == 0:
  50. print 'Read %d lines'%count
  51. count+=1
  52. start = 0
  53. end = min(count, batchsize)
  54. while start < count:
  55. data = datapoints[start:end]
  56. # insert
  57. print 'Inserting datapoints...'
  58. response = client.write_points(data)
  59. print "Wrote %d, response: %s" % (end-start, response)
  60. print start, end
  61. start += batchsize
  62. end = min(count, end+batchsize)
  63. if __name__ == "__main__":
  64. parser = argparse.ArgumentParser(description='Csv to kairodb.')
  65. parser.add_argument('-i', '--input', nargs='?', required=True,
  66. help='Input csv file.')
  67. parser.add_argument('-d', '--delimiter', nargs='?', required=False, default=',',
  68. help='Csv delimiter. Default: \',\'.')
  69. parser.add_argument('-s', '--server', nargs='?', default='localhost:8086',
  70. help='Server address. Default: localhost:8086')
  71. parser.add_argument('-u', '--user', nargs='?', default='root',
  72. help='User name.')
  73. parser.add_argument('-p', '--password', nargs='?', default='root',
  74. help='Password.')
  75. parser.add_argument('--dbname', nargs='?', required=True,
  76. help='Database name.')
  77. parser.add_argument('-m', '--metricname', nargs='?', default='value',
  78. help='Metric column name. Default: value')
  79. parser.add_argument('-tc', '--timecolumn', nargs='?', default='timestamp',
  80. help='Timestamp column name. Default: timestamp.')
  81. parser.add_argument('-tf', '--timeformat', nargs='?', default='%Y-%m-%d %H:%M:%S',
  82. help='Timestamp format. Default: \'%%Y-%%m-%%d %%H:%%M:%%S\' e.g.: 1970-01-01 00:00:00')
  83. parser.add_argument('--fieldcolumns', nargs='?', default='value',
  84. help='List of csv columns to use as fields, separated by comma, e.g.: value1,value2. Default: value')
  85. parser.add_argument('--tagcolumns', nargs='?', default='host',
  86. help='List of csv columns to use as tags, separated by comma, e.g.: host,data_center. Default: host')
  87. parser.add_argument('-g', '--gzip', action='store_true', default=False,
  88. help='Compress before sending to kairodb.')
  89. parser.add_argument('-b', '--batchsize', type=int, default=5000,
  90. help='Batch size. Default: 5000.')
  91. args = parser.parse_args()
  92. loadCsv(args.input, args.server, args.user, args.password, args.dbname,
  93. args.metricname, args.timecolumn, args.timeformat, args.tagcolumns,
  94. args.fieldcolumns, args.gzip, args.delimiter, args.batchsize)