Skip to content

Idea: speed up the parallelization of CountVectorizer by adding a batch mechanism to joblib.Parallel #1401

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 34 additions & 4 deletions sklearn/externals/joblib/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ class Parallel(Logger):
[Parallel(n_jobs=2)]: Done 5 out of 6 | elapsed: 0.0s remaining: 0.0s
[Parallel(n_jobs=2)]: Done 6 out of 6 | elapsed: 0.0s finished
'''
def __init__(self, n_jobs=1, verbose=0, pre_dispatch='all'):
def __init__(self, n_jobs=1, verbose=0, pre_dispatch='all', batch_size=1):
self.verbose = verbose
self.n_jobs = n_jobs
self.pre_dispatch = pre_dispatch
Expand All @@ -303,6 +303,7 @@ def __init__(self, n_jobs=1, verbose=0, pre_dispatch='all'):
# A flag used to abort the dispatching of jobs in case an
# exception is found
self._aborting = False
self.batch_size = batch_size

def dispatch(self, func, args, kwargs):
""" Queue the function for computing, with or without multiprocessing
Expand Down Expand Up @@ -510,8 +511,20 @@ def __call__(self, iterable):
self._start_time = time.time()
self.n_dispatched = 0
try:
for function, args, kwargs in iterable:
self.dispatch(function, args, kwargs)
if self.batch_size > 1:
jobs = [] # job accumulator
for function, args, kwargs in iterable:
if len(jobs) == self.batch_size: # acc has reached batch_size:
batch = JobBatch(jobs) # it's time to dispatch it
self.dispatch(batch, (), {})
jobs = []
jobs.append((function, args, kwargs))
if jobs: # dispatch remaining jobs (possibly as incomplete batch)
batch = JobBatch(jobs)
self.dispatch(batch, (), {})
else: # normal non-batch mechanism
for function, args, kwargs in iterable:
self.dispatch(function, args, kwargs)

self.retrieve()
# Make sure that we get a last message telling us we are done
Expand All @@ -528,9 +541,26 @@ def __call__(self, iterable):
self._pool.join()
os.environ.pop('__JOBLIB_SPAWNED_PARALLEL__', 0)
self._jobs = list()
output = self._output
if self.batch_size > 1: # flatten the results obtained from JobBatches
output = [r for res in self._output for r in res]
else:
output = self._output
self._output = None
return output

def __repr__(self):
return '%s(n_jobs=%s)' % (self.__class__.__name__, self.n_jobs)


###############################################################################
# Job batch to be executed by a single process (this callable pattern is used
# because multiprocessing.pool can only dispatch pickable functions)
class JobBatch:

def __init__(self, jobs):
self.jobs = jobs

def __call__(self):
# execute the jobs sequentially, and return their results as a list,
# which will need to be flattened in the end
return [func(*args, **kwargs) for func, args, kwargs in self.jobs]
59 changes: 51 additions & 8 deletions sklearn/feature_extraction/text.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import unicodedata
import warnings
import numbers
from ..externals.joblib import Parallel, delayed

import numpy as np
import scipy.sparse as sp
Expand Down Expand Up @@ -84,6 +85,41 @@ def _check_stop_list(stop):
return stop


###############################################################################
# These two functions are required for the joblib parallelization of the
# CV's analyze function. Since multiprocessing.Pool was causing me some
# trouble with the pickling of instance members and lambda functions, I cut it
# short by simply extracting the logic for a single case (word ngrams),
# with some default parameters hardcoded. Hence, this is NOT meant to be a
# complete solution, just the minimal code for my proof of concept.

def _word_ngrams_single(tokens, stop_words=None):
"""Turn tokens into a sequence of n-grams after stop words filtering"""
# handle stop words
if stop_words is not None:
tokens = [w for w in tokens if w not in stop_words]

# handle token n-grams
min_n, max_n = (1, 1)#self.ngram_range
if max_n != 1:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get it. max_n will alway be 1 since you set it that way the line before?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like I say in the comment just above, this whole PR is absolutely not meant to be a complete solution. It's just a tentative proof of concept, studying the idea for a single case (I guess I should have said "unigrams", or "1-grams" instead of "word ngrams"), hence the hardcoding of certain parameters.

original_tokens = tokens
tokens = []
n_original_tokens = len(original_tokens)
for n in xrange(min_n,
min(max_n + 1, n_original_tokens + 1)):
for i in xrange(n_original_tokens - n + 1):
tokens.append(u" ".join(original_tokens[i: i + n]))

return tokens

def _analyze_single(doc):
token_pattern = re.compile(ur"(?u)\b\w\w+\b")
return _word_ngrams_single(token_pattern.findall(doc.decode('utf-8', 'strict')))


###############################################################################


class CountVectorizer(BaseEstimator):
"""Convert a collection of raw documents to a matrix of token counts

Expand Down Expand Up @@ -432,7 +468,7 @@ def fit(self, raw_documents, y=None):
self.fit_transform(raw_documents)
return self

def fit_transform(self, raw_documents, y=None):
def fit_transform(self, raw_documents, y=None, n_jobs=1, batch_size=1):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

n_jobs and batch_size should be set into the constructor instead. It would also be nice it you could include a docstring for both of them. It might indeed not be clear for everyone what is the meaning of batch_size.

"""Learn the vocabulary dictionary and return the count vectors

This is more efficient than calling fit followed by transform.
Expand Down Expand Up @@ -467,15 +503,22 @@ def fit_transform(self, raw_documents, y=None):

analyze = self.build_analyzer()

# TODO: parallelize the following loop with joblib?
# (see XXX up ahead)
for doc in raw_documents:
term_count_current = Counter(analyze(doc))
term_counts.update(term_count_current)

# Let's see if we can gain some speed by introducing a job batch mechanism
for analysis in Parallel(n_jobs=n_jobs,
batch_size=batch_size)(delayed(_analyze_single)(doc)
for doc in raw_documents):
term_count_current = Counter(analysis)
term_counts.update(Counter(analysis))
document_counts.update(term_count_current.iterkeys())

term_counts_per_doc.append(term_count_current)

# TODO: parallelize the following loop with joblib?
# (see XXX up ahead)
# for doc in raw_documents:
# term_count_current = Counter(analyze(doc))
# term_counts.update(term_count_current)
# document_counts.update(term_count_current.iterkeys())
# term_counts_per_doc.append(term_count_current)

n_doc = len(term_counts_per_doc)
max_features = self.max_features
Expand Down