55261

Load bulk into PostgreSQLfrom log files using Python

Question:

This is a follow-up question. Below is a piece of my Python script that reads a constantly growing log files (text) and insert data into Postgresql DB. New log file generated each day. What I do is I commit each line which cuases a huge load and a really poor performance (needs 4 hours to insert 30 min of the file data!). How can I improve this code to insert bulks insead of lines? and would this help improve the performance and reduce load? I've read about copy_from but couldn't figure out how to use it in such situation.

import psycopg2 as psycopg try: connectStr = "dbname='postgis20' user='postgres' password='' host='localhost'" cx = psycopg.connect(connectStr) cu = cx.cursor() logging.info("connected to DB") except: logging.error("could not connect to the database") import time file = open('textfile.log', 'r') while 1: where = file.tell() line = file.readline() if not line: time.sleep(1) file.seek(where) else: print line, # already has newline dodecode(line) ------------ def dodecode(fields): global cx from time import strftime, gmtime from calendar import timegm import os msg = fields.split(',') part = eval(msg[2]) msgnum = int(msg[3:6]) print "message#:", msgnum print fields if (part==1): if msgnum==1: msg1 = msg_1.decode(bv) #print "message1 :",msg1 Insert(msgnum,time,msg1) elif msgnum==2: msg2 = msg_2.decode(bv) #print "message2 :",msg2 Insert(msgnum,time,msg2) elif msgnum==3: .... .... .... ---------------- def Insert(msgnum,time,msg): global cx try: if msgnum in [1,2,3]: if msg['type']==0: cu.execute("INSERT INTO table1 ( messageid, timestamp, userid, position, text ) SELECT "+str(msgnum)+", '"+time+"', "+str(msg['UserID'])+", ST_GeomFromText('POINT("+str(float(msg['longitude']), '"+text+"')+" "+str(float(msg['latitude']))+")']))+" WHERE NOT EXISTS (SELECT * FROM table1 WHERE timestamp='"+time+"' AND text='"+text+"';") cu.execute("INSERT INTO table2 ( field1,field2,field3, time_stamp, pos,) SELECT "+str(msg['UserID'])+","+str(int(msg['UserName']))+","+str(int(msg['UserIO']))+", '"+time+"', ST_GeomFromText('POINT("+str(float(msg['longitude']))+" "+str(float(msg['latitude']))+")')," WHERE NOT EXISTS (SELECT * FROM table2 WHERE field1="+str(msg['UserID'])+");") cu.execute("Update table2 SET field3='"+str(int(msg['UserIO']))+"',time_stamp='"+str(time)+"',pos=ST_GeomFromText('POINT("+str(float(msg['longitude']))+" "+str(float(msg['latitude']))+")'),"' WHERE field1='"+str(msg['UserID'])+"' AND time_stamp < '"+str(time)+"';") elif msg['type']==1: cu.execute("INSERT INTO table1 ( messageid, timestamp, userid, position, text ) SELECT "+str(msgnum)+", '"+time+"', "+str(msg['UserID'])+", ST_GeomFromText('POINT("+str(float(msg['longitude']), '"+text+"')+" "+str(float(msg['latitude']))+")']))+" WHERE NOT EXISTS (SELECT * FROM table1 WHERE timestamp='"+time+"' AND text='"+text+"';") cu.execute("INSERT INTO table2 ( field1,field2,field3, time_stamp, pos,) SELECT "+str(msg['UserID'])+","+str(int(msg['UserName']))+","+str(int(msg['UserIO']))+", '"+time+"', ST_GeomFromText('POINT("+str(float(msg['longitude']))+" "+str(float(msg['latitude']))+")')," WHERE NOT EXISTS (SELECT * FROM table2 WHERE field1="+str(msg['UserID'])+");") cu.execute("Update table2 SET field3='"+str(int(msg['UserIO']))+"',time_stamp='"+str(time)+"',pos=ST_GeomFromText('POINT("+str(float(msg['longitude']))+" "+str(float(msg['latitude']))+")'),"' WHERE field1='"+str(msg['UserID'])+"' AND time_stamp < '"+str(time)+"';") elif msg['type']==2: .... .... .... except Exception, err: #print('ERROR: %s\n' % str(err)) logging.error('ERROR: %s\n' % str(err)) cx.commit() cx.commit()

Answer1:

doing multiple rows per transaction, and per query will make it go faster,

when faced with a similar problem I put multiple rows in the values part of the insert query, but you have complicated insert queries, so you'll likely need a different approach.

I'd suggest creating a temporary table and inserting say 10000 rows into it with ordinary multi-row inserts

insert into temptable values ( /* row1 data */ ) ,( /* row2 data */ ) etc...

500 rows per insert.is a good starting point.

then joining the temp table with the existing data to de-dupe it.

delete from temptable using livetable where /* .join condition */ ;

and de-duping it against itself if that is needed too

delete from temptable where id not in ( select distinct on ( /* unique columns */) id from temptable);

then using insert-select to copy the rows from the temporary table into the live table

insert into livetable ( /* columns */ ) select /* columns */ from temptable;

it looks like you might need an update-from too

and finally dropping the temp table and starting again.

ans you're writing two tables you;re going to need to double-up all these operations.

I'd do the insert by maintaing a count and a list of values to insert and then at insert time building a repeating the (%s,%s,%s,%s) part ot the query as many times as needed and passing the list of values in separately and letting psycopg2 deal with the formatting.

I'd expect making those changes could get you a speed up of 5 times for more

Recommend

  • Richfaces fileuploader nullpointer firefox
  • How to close yet another chrome popup using python selenium
  • Example of tm use
  • SQL Server varchar(50) and varchar(128) performance difference [duplicate]
  • how to play a mp3 file from the middle
  • Vector Drawables in Layer List on Android API 16 and higher
  • Rails: Correct Way of Using the method of PostController in the Index file
  • Matlab to Python Conversion binary file read
  • Convert ctime to unicode and unicode to ctime python
  • Determine if a weekday is between two dates in R
  • Standard way for writing a debug mode in C++
  • Complex python3 csv scraper
  • How to read data from socket connection - android
  • C# where to add a method
  • Is the Go HTTP handler goroutine expected to exit immediately in this case?
  • vi mode to emacs mode while on R
  • Parallel sieve of Eratosthenes - Java Multithreading
  • How to get month name with year and list of years between two Date
  • MRO with multiple inheritance in python
  • How to move an element within a structure, possibly with zippers?
  • @tailrec why does this method not compile with 'contains a recursive call not in tail position&
  • Fully customized Python Help Usage
  • SPARQL date range
  • Ruby 1.8.6 Array#uniq not removing duplicate hashes
  • Faster Way To Simultaneously Iterate Over Rolling Window Of Two Or More Numpy Arrays?
  • Django foreign key drop down
  • Find Previous month name using Calender or any classes that in java
  • SAXReader not re-ecape characters
  • Reading a file into a multidimensional array
  • nonblocking BIO_do_connect blocked when there is no internet connected
  • Grails calculated field in SQL
  • Does it make sense to call System.gc() and Thread.sleep() when working on Bitmaps?
  • Date Conversion from yyyy-mm-dd to dd-mm-yyyy
  • How to add date and time under each post in guestbook in google app engine
  • JSON with duplicate key names losing information when parsed
  • align graphs with different xlab
  • Return words with double consecutive letters
  • 0x202A in filename: Why?
  • Reading document lines to the user (python)
  • Python/Django TangoWithDjango Models and Databases