53810

criticism this python code (crawler with threadpool)

Question:

how good this python code ? need criticism) there is a error in this code, some times script do print "ALL WAIT - CAN FINISH!" and freeze (no more actions are happend..) but i can't find reason why this happend?

site crawler with threadpool:

import sys from urllib import urlopen from BeautifulSoup import BeautifulSoup, SoupStrainer import re from Queue import Queue, Empty from threading import Thread W_WAIT = 1 W_WORK = 0 class Worker(Thread): """Thread executing tasks from a given tasks queue""" def __init__(self, pool, tasks): Thread.__init__(self) self.tasks = tasks self.daemon = True self.start() self.pool = pool self.state = None def is_wait(self): return self.state == W_WAIT def run(self): while True: #if all workers wait - time to exsit print "CHECK WAIT: !!! ",self.pool.is_all_wait() if self.pool.is_all_wait(): print "ALL WAIT - CAN FINISH!" return try: func, args, kargs = self.tasks.get(timeout=3) except Empty: print "task wait timeout" continue self.state = W_WORK print "START !!! in thread %s" % str(self) #print args try: func(*args, **kargs) except Exception, e: print e print "!!! STOP in thread %s", str(self) self.tasks.task_done() self.state = W_WAIT #threads can fast empty it! #if self.tasks.qsize() == 0: # print "QUIT!!!!!!" # break class ThreadPool: """Pool of threads consuming tasks from a queue""" def __init__(self, num_threads): #self.tasks = Queue(num_threads) self.tasks = Queue() self.workers = [] for _ in range(num_threads): self.workers.append(Worker(self,self.tasks)) def add_task(self, func, *args, **kargs): """Add a task to the queue""" self.tasks.put((func, args, kargs)) def wait_completion(self): """Wait for completion of all the tasks in the queue""" self.tasks.join() def is_all_wait(self): for w in self.workers: if not w.is_wait(): return False return True visited = set() queue = Queue() external_links_set = set() internal_links_set = set() external_links = 0 def process(pool,host,url): try: content = urlopen(url).read() except UnicodeDecodeError: return for link in BeautifulSoup(content, parseOnlyThese=SoupStrainer('a')): try: href = link['href'] except KeyError: continue if not href.startswith('http://'): href = 'http://%s%s' % (host, href) if not href.startswith('http://%s%s' % (host, '/')): continue internal_links_set.add(href) if href not in visited: visited.add(href) pool.add_task(process,pool,host,href) else: pass def start(host,charset): pool = ThreadPool(20) pool.add_task(process,pool,host,'http://%s/' % (host)) pool.wait_completion() start('evgenm.com','utf8')

Thanx for help! i make new implementation: What you can say about this code#2 ? ==================================TRY #2=======================================

import sys from urllib import urlopen from BeautifulSoup import BeautifulSoup, SoupStrainer import re from Queue import Queue, Empty from threading import Thread W_STOP = 1 class Worker(Thread): """Thread executing tasks from a given tasks queue""" def __init__(self, pool, tasks): Thread.__init__(self) self.tasks = tasks self.daemon = True self.pool = pool self.state = None self.start() def stop(self): self.state = W_STOP def run(self): while True: if self.state == W_STOP: print "\ncalled stop" break try: func, args, kargs = self.tasks.get(timeout=3) except Empty: continue print "\n***START*** %s" % str(self) try: func(*args, **kargs) except Exception, e: print e print "\n***STOP*** %s", str(self) self.tasks.task_done() class ThreadPool: """Pool of threads consuming tasks from a queue""" def __init__(self, num_threads): #self.tasks = Queue(num_threads) self.tasks = Queue() self.workers = [] for _ in range(num_threads): self.workers.append(Worker(self,self.tasks)) def add_task(self, func, *args, **kargs): """Add a task to the queue""" self.tasks.put((func, args, kargs)) def wait_completion(self): """Wait for completion of all the tasks in the queue""" self.tasks.join() def stop_threads(self): for w in self.workers: w.stop() def wait_stop(self): self.wait_completion() self.stop_threads() visited = set() queue = Queue() external_links_set = set() internal_links_set = set() external_links = 0 def process(pool,host,url): try: content = urlopen(url).read() except UnicodeDecodeError: return for link in BeautifulSoup(content, parseOnlyThese=SoupStrainer('a')): try: href = link['href'] except KeyError: continue if not href.startswith('http://'): href = 'http://%s%s' % (host, href) if not href.startswith('http://%s%s' % (host, '/')): continue internal_links_set.add(href) if href not in visited: visited.add(href) pool.add_task(process,pool,host,href) else: pass def start(host,charset): pool = ThreadPool(20) pool.add_task(process,pool,host,'http://%s/' % (host)) pool.wait_stop() start('evgenm.com','utf8')

Answer1:

You are sharing state between threads (i.e., in is_all_wait) without synchronization. Plus, the fact that all threads are "waiting" is not a reliable indicator that the <em>queue</em> is empty (for instance, they could all be in the process of getting a task). I suspect that, occasionally, threads are exiting before the queue is truly empty. If this happens often enough, you will be left with tasks in the queue but no threads to run them. So queue.join() will wait forever.

My recomendation is:

<ol><li>Get rid of is_all_wait -- it's not a reliable indicator</li> <li>Get rid of the task state -- it's not really necessary</li> <li>Rely on queue.join to let you know when everything is processed</li> </ol>

If you need to kill the threads (for example, this is part of a larger, long-running program), then do so after the queue.join().

Answer2:

I have basic python knowledge but threading in python isn't useless? I've seen tons of articles criticizing the global lock interpreter for this.

Recommend

  • Python Decorator Self-firing
  • How to capture or listen to browser notifications?
  • Using Node cluster module with SailsJs: EADDRINUSE
  • how tensorflow worker driver training process and cause variables update on ps job?
  • Downloading files from Google Storage using Spark (Python) and Dataproc
  • Paramiko SSHException Channel Closed
  • Parallel sieve of Eratosthenes - Java Multithreading
  • Multiple producers single consumer locking schema
  • Code in Job's Script Block after Start-Process Does not Execute
  • How to Cache Real-time Data?
  • How to avoid particles glitching together in an elastic particle collision simulator?
  • Recording logins for password protected directories
  • Spring Data JPA custom method causing PropertyReferenceException
  • Update CALayer sublayers immediately
  • java.lang.NoClassDefFoundError: com.parse.Parse$Configuration$Builder on below Lollipop versions
  • Splitting given String into two variables - php
  • JFileChooser in front of fullscreen Swing application
  • jQuery show() function is not executed in Safari if submit handler returns true
  • Align navbar back button on right side
  • HTML download movie download link
  • Possible to stop flickering java tooltip in heavyweight mode?
  • Check if a string to interpolate provides expected placeholders
  • Does CUDA 5 support STL or THRUST inside the device code?
  • Modifying destination and filename of gulp-svg-sprite
  • How to handle AllServersUnavailable Exception
  • How to model a transition system with SPIN
  • VBA Convert delimiter text file to Excel
  • ORA-29908: missing primary invocation for ancillary operator
  • How to disable jQuery.jplayer autoplay?
  • How to stop GridView from loading again when I press back button?
  • Can Visual Studio XAML designer handle font family names with spaces as a resource?
  • How can I remove ASP.NET Designer.cs files?
  • Bitwise OR returns boolean when one of operands is nil
  • Are Kotlin's Float, Int etc optimised to built-in types in the JVM? [duplicate]
  • sending mail using smtp is too slow
  • XCode 8, some methods disappeared ? ex: layoutAttributesClass() -> AnyClass
  • Checking variable from a different class in C#
  • Busy indicator not showing up in wpf window [duplicate]
  • Why is Django giving me: 'first_name' is an invalid keyword argument for this function?
  • How can I use `wmic` in a Windows PE script?