Optimizing python code performance when importing zipped csv to a mongo collection
- by mark
I need to import a zipped csv into a mongo collection, but there is a catch - every record contains a timestamp in Pacific Time, which must be converted to the local time corresponding to the (longitude,latitude) pair found in the same record.
The code looks like so:
def read_csv_zip(path, timezones):
  with ZipFile(path) as z, z.open(z.namelist()[0]) as input:
    csv_rows = csv.reader(input)
    header = csv_rows.next()
    check,converters = get_aux_stuff(header)
    for csv_row in csv_rows:
      if check(csv_row):
        row = {
          converter[0]:converter[1](value) 
          for converter, value in zip(converters, csv_row) 
          if allow_field(converter)
        }
        ts = row['ts']
        lng, lat = row['loc']
        found_tz_entry = timezones.find_one(SON({'loc': {'$within': {'$box': [[lng-tz_lookup_radius, lat-tz_lookup_radius],[lng+tz_lookup_radius, lat+tz_lookup_radius]]}}}))
        if found_tz_entry:
          tz_name = found_tz_entry['tz']
          local_ts = ts.astimezone(timezone(tz_name)).replace(tzinfo=None)
          row['tz'] = tz_name
        else:
          local_ts = (ts.astimezone(utc) + timedelta(hours = int(lng/15))).replace(tzinfo = None)
        row['local_ts'] = local_ts
        yield row
def insert_documents(collection, source, batch_size):
  while True:
    items = list(itertools.islice(source, batch_size))
    if len(items) == 0:
      break;
    try:
      collection.insert(items)
    except:
      for item in items:
        try:
          collection.insert(item)
        except Exception as exc:
          print("Failed to insert record {0} - {1}".format(item['_id'], exc))
def main(zip_path):
  with Connection() as connection:
    data = connection.mydb.data
    timezones = connection.timezones.data
    insert_documents(data, read_csv_zip(zip_path, timezones), 1000)
The code proceeds as follows:
Every record read from the csv is checked and converted to a dictionary, where some fields may be skipped, some titles be renamed (from those appearing in the csv header), some values may be converted (to datetime, to integers, to floats. etc ...)
For each record read from the csv, a lookup is made into the timezones collection to map the record location to the respective time zone. If the mapping is successful - that timezone is used to convert the record timestamp (pacific time) to the respective local timestamp. If no mapping is found - a rough approximation is calculated.
The timezones collection is appropriately indexed, of course - calling explain() confirms it.
The process is slow. Naturally, having to query the timezones collection for every record kills the performance. I am looking for advises on how to improve it. 
Thanks.
EDIT
The timezones collection contains 8176040 records, each containing four values:
> db.data.findOne()
{ "_id" : 3038814, "loc" : [ 1.48333, 42.5 ], "tz" : "Europe/Andorra" }
EDIT2
OK, I have compiled a release build of http://toblerity.github.com/rtree/ and configured the rtree package. Then I have created an rtree dat/idx pair of files corresponding to my timezones collection. So, instead of calling collection.find_one I call index.intersection. Surprisingly, not only there is no improvement, but it works even more slowly now! May be rtree could be fine tuned to load the entire dat/idx pair into RAM (704M), but I do not know how to do it. Until then, it is not an alternative.
In general, I think the solution should involve parallelization of the task.
EDIT3
Profile output when using collection.find_one:
>>> p.sort_stats('cumulative').print_stats(10)
Tue Apr 10 14:28:39 2012    ImportDataIntoMongo.profile
         64549590 function calls (64549180 primitive calls) in 1231.257 seconds
   Ordered by: cumulative time
   List reduced from 730 to 10 due to restriction <10>
   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.012    0.012 1231.257 1231.257 ImportDataIntoMongo.py:1(<module>)
        1    0.001    0.001 1230.959 1230.959 ImportDataIntoMongo.py:187(main)
        1  853.558  853.558  853.558  853.558 {raw_input}
        1    0.598    0.598  370.510  370.510 ImportDataIntoMongo.py:165(insert_documents)
   343407    9.965    0.000  359.034    0.001 ImportDataIntoMongo.py:137(read_csv_zip)
   343408    2.927    0.000  287.035    0.001 c:\python27\lib\site-packages\pymongo\collection.py:489(find_one)
   343408    1.842    0.000  274.803    0.001 c:\python27\lib\site-packages\pymongo\cursor.py:699(next)
   343408    2.542    0.000  271.212    0.001 c:\python27\lib\site-packages\pymongo\cursor.py:644(_refresh)
   343408    4.512    0.000  253.673    0.001 c:\python27\lib\site-packages\pymongo\cursor.py:605(__send_message)
   343408    0.971    0.000  242.078    0.001 c:\python27\lib\site-packages\pymongo\connection.py:871(_send_message_with_response)
Profile output when using index.intersection:
>>> p.sort_stats('cumulative').print_stats(10)
Wed Apr 11 16:21:31 2012    ImportDataIntoMongo.profile
         41542960 function calls (41542536 primitive calls) in 2889.164 seconds
   Ordered by: cumulative time
   List reduced from 778 to 10 due to restriction <10>
   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.028    0.028 2889.164 2889.164 ImportDataIntoMongo.py:1(<module>)
        1    0.017    0.017 2888.679 2888.679 ImportDataIntoMongo.py:202(main)
        1 2365.526 2365.526 2365.526 2365.526 {raw_input}
        1    0.766    0.766  502.817  502.817 ImportDataIntoMongo.py:180(insert_documents)
   343407    9.147    0.000  491.433    0.001 ImportDataIntoMongo.py:152(read_csv_zip)
   343406    0.571    0.000  391.394    0.001 c:\python27\lib\site-packages\rtree-0.7.0-py2.7.egg\rtree\index.py:384(intersection)
   343406  379.957    0.001  390.824    0.001 c:\python27\lib\site-packages\rtree-0.7.0-py2.7.egg\rtree\index.py:435(_intersection_obj)
   686513   22.616    0.000   38.705    0.000 c:\python27\lib\site-packages\rtree-0.7.0-py2.7.egg\rtree\index.py:451(_get_objects)
   343406    6.134    0.000   33.326    0.000 ImportDataIntoMongo.py:162(<dictcomp>)
      346    0.396    0.001   30.665    0.089 c:\python27\lib\site-packages\pymongo\collection.py:240(insert)
EDIT4
I have parallelized the code, but the results are still not very encouraging. I am convinced it could be done better. See my own answer to this question for details.