how to load a word2vec model and call its function into the mapper


I want to load a word2vec model and evaluate it by executing word analogy tasks (e.g. <em>a is to b as c is to something?</em>). To do this, first I load my w2v model:

<pre class="lang-py prettyprint-override">model = Word2VecModel.load(spark.sparkContext, str(sys.argv[1]))

and then I call the mapper to evaluate the model:

<pre class="lang-py prettyprint-override">rdd_lines = spark.read.text("questions-words.txt").rdd.map(getAnswers)

The getAnswers function reads one line per time from <em>questions-words.txt</em>, in which each line contains the question and the answer to evaluate my model (e.g. Athens Greece Baghdad Iraq, where a=Athens, b=Greece, c=Baghdad and something=Iraq). After reading the line, I create the current_question and the actual_answer (e.g.: current_question=Athens Greece Baghdad and actual_answer=Iraq). After that, I call the getAnalogy function that is used to compute the analogy (basically, given the question it computes the answer). Finally, after computing the analogy, I return the answer and write it to a text file.

The problem is that I get the following exception:

<pre class="lang-py prettyprint-override">Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers.

and I think that it is thrown because I am using the model within the map function. This <a href="https://stackoverflow.com/questions/34448456/using-word2vecmodel-transform-does-not-work-in-map-function" rel="nofollow">question</a> is similar to my problem but I do not know how to apply that answer to my code. How can I solve this problem? The following is the full code:

<pre class="lang-py prettyprint-override">def getAnalogy(s, model): try: qry = model.transform(s[0]) - model.transform(s[1]) - model.transform(s[2]) res = model.findSynonyms((-1)*qry,5) # return 5 "synonyms" res = [x[0] for x in res] for k in range(0,3): if s[k] in res: res.remove(s[k]) return res[0] except ValueError: return "NOT FOUND" def getAnswers (text): tmp = text[0].split(' ', 3) answer_list = [] current_question = " ".join(str(x) for x in tmp[:3]) actual_answer = tmp[-1] model_answer = getAnalogy(current_question, model) if model_answer is "NOT FOUND": answer_list.append("NOT FOUND\n") elif model_answer is actual_answer: answer_list.append("TRUE\n") else: answer_list.append("FALSE:\n") return answer_list.append if __name__ == "__main__": if len(sys.argv) != 3: print("Usage: my_test <file>", file=sys.stderr) exit(-1) spark = SparkSession\ .builder\ .appName("my_test")\ .getOrCreate() model = Word2VecModel.load(spark.sparkContext, str(sys.argv[1])) rdd_lines = spark.read.text("questions-words.txt").rdd.map(getAnswers) dataframe = rdd_lines.toDF() dataframe.write.text(str(sys.argv[2])) spark.stop()


As you have already suspected, you cannot use the model in a map function. On the other hand, the questions-answers.txt file is not that big (~ 20K lines), so you should better do the evaluation using vanilla Python list comprehensions (it is essentially the first suggested answer in the question you have linked); it is not fast, but it is just an one-off task. Here is a way, using <a href="https://stackoverflow.com/questions/34172242/spark-word2vec-vector-mathematics/34298583#34298583" rel="nofollow">my getAnalogy function</a> as you have augmented it for error handling (notice that I have already removed the 'comment' lines from questions-answers.txt, and that you should convert it to lowercase, something you don't seem to be doing in your code):

<pre class="lang-py prettyprint-override">from pyspark.mllib.feature import Word2Vec, Word2VecModel model = Word2VecModel.load(sc, "word2vec/demo_200") # model built with k=200 with open('/home/ctsats/word2vec/questions-words.txt') as f: lines = f.readlines() lines2 = [x.lower() for x in lines] # all to lowercase lines3 = [x.strip('\n') for x in lines2] # remove end-of-line characters lines4 = [x.split(' ',3) for x in lines3] lines4[0] # check: # ['Athens', 'Greece', 'Baghdad', 'Iraq'] def getAnswers (text, model): actual_answer = text[-1] question = [text[0], text[1], text[2]] model_answer = getAnalogy(question, model) if model_answer == "NOT FOUND": correct_answer = "NOT FOUND" elif model_answer == actual_answer: correct_answer = "TRUE" else: correct_answer = "FALSE" return text, model_answer, correct_answer

So, your evaluation list can now be built as

<pre class="lang-py prettyprint-override">answer_list = [getAnswers(x, model) for x in lines4]

Here's an example for the first 20 entries (with a model of k=200):

<pre class="lang-py prettyprint-override">[(['athens', 'greece', 'baghdad', 'iraq'], u'turkey', 'FALSE'), (['athens', 'greece', 'bangkok', 'thailand'], u'turkey', 'FALSE'), (['athens', 'greece', 'beijing', 'china'], u'albania', 'FALSE'), (['athens', 'greece', 'berlin', 'germany'], u'germany', 'TRUE'), (['athens', 'greece', 'bern', 'switzerland'], u'liechtenstein', 'FALSE'), (['athens', 'greece', 'cairo', 'egypt'], u'albania', 'FALSE'), (['athens', 'greece', 'canberra', 'australia'], u'liechtenstein', 'FALSE'), (['athens', 'greece', 'hanoi', 'vietnam'], u'turkey', 'FALSE'), (['athens', 'greece', 'havana', 'cuba'], u'turkey', 'FALSE'), (['athens', 'greece', 'helsinki', 'finland'], u'finland', 'TRUE'), (['athens', 'greece', 'islamabad', 'pakistan'], u'turkey', 'FALSE'), (['athens', 'greece', 'kabul', 'afghanistan'], u'albania', 'FALSE'), (['athens', 'greece', 'london', 'england'], u'italy', 'FALSE'), (['athens', 'greece', 'madrid', 'spain'], u'portugal', 'FALSE'), (['athens', 'greece', 'moscow', 'russia'], u'russia', 'TRUE'), (['athens', 'greece', 'oslo', 'norway'], u'albania', 'FALSE'), (['athens', 'greece', 'ottawa', 'canada'], u'moldova', 'FALSE'), (['athens', 'greece', 'paris', 'france'], u'france', 'TRUE'), (['athens', 'greece', 'rome', 'italy'], u'italy', 'TRUE'), (['athens', 'greece', 'stockholm', 'sweden'], u'norway', 'FALSE')]


