Lanaro G. - Python High Performance, Second Edition - 2017
Lanaro G. - Python High Performance, Second Edition - 2017
Lanaro G. - Python High Performance, Second Edition - 2017
www.packtpub.com
htt
ps://github.com/WillBrennan
www.PacktPub.com
www.PacktPub.com
service@packtpub.com
www.PacktPub.com
https://www.packtpub.com/mapt
https://www.amazon.com/dp/1787282899
customerreviews@packtpub.com
Chapter 1
Chapter 2
Chapter 3
Chapter 4
Chapter 5
Chapter 6
Chapter 7
Chapter 8
Chapter 9
https://www.continuum.io/downl
oads
ParticleSimulator.evolve_numpy
ParticleSimulator.evolve_python
def square(x):
return x * x
inputs = [0, 1, 2, 3, 4]
outputs = pool.map(square, inputs)
def square(x):
return x * x
inputs = [0, 1, 2, 3, 4]
feedback@packtpub.com
www.packtpub.com/authors
http://www.p
acktpub.com http://www.packtpub.c
om/support
https://github.com/PacktPubl
ishing/Python-High-Performance-Second-Edition
https://github.com/PacktPublish
ing/
https://www.packtpub.com/sites/default/files/down
loads/PythonHighPerformanceSecondEdition_ColorImages.pdf
http://www.packtpub.com/submit-errata
https://www.packtpub.com/books/conten
t/support
copyright@packtpub.com
questions@packtpub.com
cProfile line_profiler
memory_profiler
time
timeit
pytest
cProfile
line_profiler memory_profiler
dis
-
(0, 0)
Particle
x y ang_vel
class Particle:
def __init__(self, x, y, ang_vel):
self.x = x
self.y = y
self.ang_vel = ang_vel
ang_vel
ParticleSimulator
__init__
Particle evolve
x=0
y=0
v_x
v_y
ParticleSimulator
class ParticleSimulator:
for i in range(nsteps):
for p in self.particles:
# 1. calculate the direction
norm = (p.x**2 + p.y**2)**0.5
v_x = -p.y/norm
v_y = p.x/norm
p.x += d_x
p.y += d_y
# 3. repeat for all the time steps
matplotlib
pip install
matplotlib
https://store.continuum.io/cshop/anaconda/
matplotlib
matplotlib.pyplot.plot
matplotlib.animation.FuncAnimation
visualize ParticleSimulator
matplotlib
plot plot
init animate
line.set_data
FuncAnimation init animate
interval blit
plt.show()
def visualize(simulator):
fig = plt.figure()
ax = plt.subplot(111, aspect='equal')
line, = ax.plot(X, Y, 'ro')
# Axis limits
plt.xlim(-1, 1)
plt.ylim(-1, 1)
def animate(i):
# We let the particle evolve for 0.01 time units
simulator.evolve(0.01)
X = [p.x for p in simulator.particles]
Y = [p.y for p in simulator.particles]
line.set_data(X, Y)
return line,
test_visualize
def test_visualize():
particles = [Particle(0.3, 0.5, 1),
Particle(0.0, -0.5, -1),
Particle(-0.1, -0.4, 3)]
simulator = ParticleSimulator(particles)
visualize(simulator)
if __name__ == '__main__':
test_visualize()
test_visualize
evolve test_evolve
test_evolve
fequal
def test_evolve():
particles = [Particle( 0.3, 0.5, +1),
Particle( 0.0, -0.5, -1),
Particle(-0.1, -0.4, +3)]
simulator = ParticleSimulator(particles)
simulator.evolve(0.1)
if __name__ == '__main__':
test_evolve()
Particle
ParticleSimulator
def benchmark():
particles = [Particle(uniform(-1.0, 1.0),
uniform(-1.0, 1.0),
uniform(-1.0, 1.0))
for i in range(1000)]
simulator = ParticleSimulator(particles)
simulator.evolve(0.1)
if __name__ == '__main__':
benchmark()
time
time
time
time cygwin
http://www.cygwin.com/
Measure-Command https
://msdn.microsoft.com/en-us/powershell/reference/5.1/microsoft
.powershell.utility/measure-command
time
real
user
sys
time
user
real
timeit
3
timeit
timeit
%
%%
pip
ipython
ipython qtconsole
jupyter notebook
-n -r
timeit timeit
-s
timeit
n
number
time
pytest
pytest-benchmark
pytest
pytest
http://doc.pytest.org/en/latest/,
pytest
test_simul.py test_evolve
def test_evolve():
particles = [Particle( 0.3, 0.5, +1),
Particle( 0.0, -0.5, -1),
Particle(-0.1, -0.4, +3)]
simulator = ParticleSimulator(particles)
simulator.evolve(0.1)
pytest
pytest
path/to/module.py::function_name test_evolve
pytest-benchmark test
benchmark pytest
benchmark pytest
ParticleSimulator.evolve
def test_evolve(benchmark):
# ... previous code
pytest
test_simul.py::test_evolve
test_evolve
pytest-benchmark
test_evolve 34 Rounds
29 41 Min Max Average Median
30
time
pytest-benchmark
http
://pytest-benchmark.readthedocs.io/en/stable/usage.html
profile
cProfile
profile
cProfile
cProfile
cProfile
-s
tottime
cProfile -o
cProfile stats
-o
cProfile.run
cProfile.run("benchmark()")
cProfile.Profile
pr = cProfile.Profile()
pr.enable()
benchmark()
pr.disable()
pr.print_stats()
cProfile %prun
cProfile
ncalls
tottime
cumtime
percall
filename:lineno
tottime
evolve
cProfile
line_profiler
cProfile
cProfile
http://sourc
eforge.net/projects/qcachegrindwin/
http://www.macports.org/
http://blogs.perl.org/user
s/rurban/2013/04/install-kachegrind-on-macosx-with-ports.html
cProfile
pyprof2calltree cProfile
pyprof2calltree
pip install pyprof2calltree
recursive factorial
factorial taylor_exp taylor_sin
exp(x) sin(x)
def factorial(n):
if n == 0:
return 1.0
else:
return n * factorial(n-1)
def taylor_exp(n):
return [1.0/factorial(i) for i in range(n)]
def taylor_sin(n):
res = []
for i in range(n):
if i % 2 == 1:
res.append((-1)**((i-1)/2)/float(factorial(i)))
else:
res.append(0.0)
return res
def benchmark():
taylor_exp(500)
taylor_sin(500)
if __name__ == '__main__':
benchmark()
cProfile
pyprof2calltree
cProfile
cProfile cumtime tottime
factorial
taylor_exp
taylor_sin
taylor_exp factorial taylor_sin
factorial
taylor_exp
taylor_exp
https://github.com/jrfonseca/gprof2dot
.dot
line_profiler
line_profiler
https://github.com/rkern/lin
e_profiler
line_profiler @profile
profile
kernprof.py
@profile evolve
@profile
def evolve(self, dt):
# code
kernprof.py
-l line_profiler
-v
kernprof.py
line_profiler lprun
@profile
Line #
Hits
Time Time
Per Hit
% Time
Line Contents
for
r alpha
x y
x = r * cos(alpha)
y = r * sin(alpha)
timestep * p.ang_vel
v_x = (-p.y)/norm
import dis
from simul import ParticleSimulator
dis.dis(ParticleSimulator.evolve)
v_x = (-p.y)/norm
29 85 LOAD_FAST 5 (p)
88 LOAD_ATTR 4 (y)
91 UNARY_NEGATIVE
92 LOAD_FAST 6 (norm)
95 BINARY_TRUE_DIVIDE
96 STORE_FAST 7 (v_x)
LOAD_FAST p LOAD_ATTR y
UNARY_NEGATIVE
BINARY_TRUE_DIVIDE
v_x STORE_FAST
dis 51
35
dis
Particle
memory_profiler line_profiler
memory_profiler
psutil https://github.com
/giampaolo/psutil
memory_profiler
line_profiler memory_profiler
@profile
benchmark
benchmark 100000
Particle
def benchmark_memory():
particles = [Particle(uniform(-1.0, 1.0),
uniform(-1.0, 1.0),
uniform(-1.0, 1.0))
for i in range(100000)]
simulator = ParticleSimulator(particles)
simulator.evolve(0.001)
memory_profiler %mprun
memory_profiler mprof
run @profile
__slots__
class Particle:
__slots__ = ('x', 'y', 'ang_vel')
for
input = list(range(10))
for i, _ in enumerate(input):
input[i] += 1
input[i] += 1
Algorithms.ipynb
list.pop()
list.pop(0)
list.append(1)
list.insert(0, 1)
collections.deque
pop append popleft
appendleft
deque.pop()
deque.popleft()
deque.append(1)
deque.appendleft(1)
appendleft popleft
deque[0]
deque[N - 1]
deque[int(N / 2)]
list.index
bisect
bisect bisect.bisect
3
collection 3
insert bisect
collection = [1, 2, 4, 5, 6]
bisect.bisect(collection, 3)
# Result: 2
1 1000 2
2000 3 4000
100 1033
bisect.bisect
bisect.bisect_left
https://docs.python.org/3.5/library/bisect.html
bisect
list.index(a)
index_bisect(list, a)
hash
hash
"hello"
hash("hello")
# Result: -1182655621190490452
counter_dict
def counter_dict(items):
counter = {}
for item in items:
if item not in counter:
counter[item] = 0
else:
counter[item] += 1
return counter
collections.defaultdict
defaultdict(int)
collections Counter
Counter
Counter(items)
counter_dict(items)
counter_defaultdict(items)
table
counter_dict
# Building an index
index = {}
for i, doc in enumerate(docs):
# We iterate over each term in the document
for word in doc.split():
# We build a list containing the indices
# where the term appears
if word not in index:
index[word] = [i]
else:
index[word].append(i)
results = index["table"]
result_documents = [docs[i] for i in results]
set
s t
s.union(t)
s.intersection(t)
s.difference(t)
cat table
cat
table
bisect
list.pop
heapq
heapq.heapify
import heapq
collection = [10, 3, 3, 4, 5, 6]
heapq.heapify(collection)
heapq.heappop(collection)
# Returns: 3
1 heapq.heappush
heapq.heappush(collection, 1)
queue.PriorityQueue
PriorityQueue
PriorityQueue.put PriorityQueue.get
queue = PriorityQueue()
for element in collection:
queue.put(element)
queue.get()
# Returns: 3
-1
(number, object)
queue = PriorityQueue()
queue.put((3, "priority 3"))
queue.put((2, "priority 2"))
queue.put((1, "priority 1"))
queue.get()
# Returns: (1, "priority 1")
patricia-trie
patricia-trie
random_string random_string
random_string
def random_string(length):
"""Produce a random string made of *length* uppercase ascii
characters"""
return ''.join(choice(ascii_uppercase) for i in range(length))
"AA" str.startswith
str.startwith
patricia-trie trie.iter
matches = list(strings_trie.iter('AA'))
%timeit list(strings_trie.iter('AA'))
10000 loops, best of 3: 60.1 µs per loop
patricia-trie
datrie marisa-trie
functools functools.lru_cache
sum2
sum2
"Calculating ..."
@lru_cache()
def sum2(a, b):
print("Calculating {} + {}".format(a, b))
return a + b
print(sum2(1, 2))
# Output:
# Calculating 1 + 2
# 3
print(sum2(1, 2))
# Output:
# 3
lru_cache
max_size
None max_size
@lru_cache(max_size=16)
def sum2(a, b):
...
sum2
16
lru
lru_cache
cache_info
cache_clear
sum2.cache_info()
# Output: CacheInfo(hits=0, misses=1, maxsize=128, currsize=1)
sum2.cache_clear()
fibonacci
def fibonacci(n):
if n < 1:
return 1
else:
return fibonacci(n - 1) + fibonacci(n - 2)
# Non-memoized version
%timeit fibonacci(20)
100 loops, best of 3: 5.57 ms per loop
lru_cache
fibonacci
timeit.repeat
import timeit
setup_code = '''
from functools import lru_cache
from __main__ import fibonacci
fibonacci_memoized = lru_cache(maxsize=None)(fibonacci)
'''
results = timeit.repeat('fibonacci_memoized(20)',
setup=setup_code,
repeat=1000,
number=1)
print("Fibonacci took {:.2f} us".format(min(results)))
# Output: Fibonacci took 0.01 us
lru_cache
joblib
lru_cache
joblib Memory
Memory.cache
@memory.cache
def sum2(a, b):
return a + b
lru_cache
cachedir Memory
Memory.cache
joblib
numpy
sum
def loop():
res = []
for i in range(100000):
res.append(i * i)
return sum(res)
def comprehension():
return sum([i * i for i in range(100000)])
def generator():
return sum(i * i for i in range(100000))
%timeit loop()
100 loops, best of 3: 16.1 ms per loop
%timeit comprehension()
100 loops, best of 3: 10.1 ms per loop
%timeit generator()
100 loops, best of 3: 12.4 ms per loop
dict
def loop():
res = {}
for i in range(100000):
res[i] = i
return res
def comprehension():
return {i: i for i in range(100000)}
%timeit loop()
100 loops, best of 3: 13.2 ms per loop
%timeit comprehension()
100 loops, best of 3: 12.8 ms per loop
filter map
def map_comprehension(numbers):
a = [n * 2 for n in numbers]
b = [n ** 2 for n in a]
c = [n ** 0.33 for n in b]
return max(c)
map
map
map
def map_normal(numbers):
a = map(lambda n: n * 2, numbers)
b = map(lambda n: n ** 2, a)
c = map(lambda n: n ** 0.33, b)
return max(c)
memory_profiler
%memit
%timeit
%load_ext memory_profiler
numbers = range(1000000)
%memit map_comprehension(numbers)
peak memory: 166.33 MiB, increment: 102.54 MiB
%memit map_normal(numbers)
peak memory: 71.04 MiB, increment: 0.00 MiB
102.54 MiB
0.00 MiB
itertools
numpy
numpy.ndarray
numexpr
numpy.ndarray
numpy.array
import numpy as np
a = np.array([0, 1, 2])
dtype
a dtype int64
a.dtype
# Result:
# dtype('int64')
float
dtype
astype
ndarray.shape
a.shape
# Result:
# (2, 3)
(2,
8) (4, 4) (2, 2, 4) ndarray.reshape
ndarray.shape
ndarray.reshape
a = np.array([0, 1, 2, 3, 4, 5, 6, 7, 8,
9, 10, 11, 12, 13, 14, 15])
a.shape
# Output:
# (16,)
dtype
np.zeros((3, 3))
np.empty((3, 3))
np.ones((3, 3), dtype='float32')
numpy.random
(0, 1) numpy.random.rand
np.random.rand(3, 3)
np.zeros_like(a)
np.empty_like(a)
np.ones_like(a)
for
A = np.array([0, 1, 2, 3, 4, 5, 6, 7, 8])
A[0]
# Result:
# 0
[a for a in A]
# Result:
# [0, 1, 2, 3, 4, 5, 6, 7, 8]
[] (3,3)
0
(0, 1)
A[0, 1] A[(0, 1)]
A[0, 1]
# Result:
# 1
A[0:2]
# Result:
# array([[0, 1, 2],
# [3, 4, 5]])
0:2
(2, 2)
A[0:2, 0:2]
# Result:
# array([[0, 1],
# [3, 4]])
A[0, 1] = 8
A[0:2, 0:2] = [[1, 1], [1, 1]]
a= np.array([1, 1, 1, 1])
a_view = a[0:2]
a_view[0] = 2
print(a)
# Output:
# [2 1 1 1]
a.flags.writeable = False
r_i
(10, 2)
r_i = np.random.rand(10, 2)
(10, 2)
(2, 10)
x_i = r_i[:, 0]
r_0 = r_i[0, :]
r_i[0]
r_i[0, :]
a idx
np.array([0, 2, 3])
(3,) 0 2 3
a = np.array([9, 8, 7, 6, 5, 4, 3, 2, 1, 0])
idx = np.array([0, 2, 3])
a[idx]
# Result:
# array([9, 7, 6])
(0, 2) (1, 3)
1 0
r_i = np.random(10, 2)
r_i[:, [0, 1]] = r_i[:, [1, 0]]
bool bool
True
a = np.array([0, 1, 2, 3, 4, 5])
mask = np.array([True, False, True, False, False, False])
a[mask]
# Output:
# array([0, 2])
True
numpy.take numpy.compress
numpy.take
axis
r_i = np.random(100, 2)
idx = np.arange(50) # integers 0 to 50
numpy.compress
numpy.compress
(2,2)
(2, 2)
A * 2
# Output:
# array([[2, 4],
# [6, 8]])
(3, 2) (2,)
(3, 2)
(3, 2) (2, 2)
5, 10, 2
(5, 1, 2)
5, 10, 2
5, 1, 2 → repeated
- - - -
5, 10, 2
numpy.newaxis
(5, 2) (5, 10, 2)
(5,
1, 2)
A = np.random.rand(5, 10, 2)
B = np.random.rand(5, 2)
A * B[:, np.newaxis, :]
a (3,) (3, 1)
b (3,) (1, 3)
np.sqrt(np.array([4, 9, 16]))
# Result:
# array([2., 3., 4.])
0 1
0.5 >
bool
a = np.random.rand(5, 3)
a > 0.3
# Result:
# array([[ True, False, True],
# [ True, True, True],
# [False, True, True],
# [ True, True, False],
# [ True, True, False]], dtype=bool)
bool
0.5
ndarray.sum
(5, 3) ndarray.sum
a = np.random.rand(5, 3)
a.sum(axis=0)
# Result:
# array([ 2.7454, 2.5517, 2.0303])
a.sum(axis=1)
# Result:
# array([ 1.7498, 1.2491, 1.8151, 1.9320, 0.5814])
0 (3,)
1 (5,)
(x**2, y**2)
numpy.sum
numpy.sqrt
r_i = np.random.rand(10, 2)
norm = np.sqrt((r_i ** 2).sum(axis=1))
print(norm)
# Output:
# [ 0.7314 0.9050 0.5063 0.2553 0.0778 0.9143 1.3245
0.9486 1.010 1.0212]
Chapter 1
ParticleSimulator.evolve
for i in range(nsteps):
for p in self.particles:
p.x += d_x
p.y += d_y
(nparticles, 2)
(nparticles,) nparticles
r_i ang_vel_i
v_x = -y / norm
v_y = x / norm
r_i
v_i ang_vel_i
timestep ang_vel_i (nparticles,)
v_i (nparticles, 2) numpy.newaxis
ParticleSimulator.evolve_numpy
ParticleSimulator.evolve_python
for i in range(nsteps):
for i, p in enumerate(self.particles):
p.x, p.y = r_i[i]
simulator = ParticleSimulator(particles)
if method=='python':
simulator.evolve_python(0.1)
numexpr.evaluate
a + b * c
a = np.random.rand(10000)
b = np.random.rand(10000)
c = np.random.rand(10000)
d = ne.evaluate('a + b * c')
numexpr
(i,j)
r = np.random.rand(10000, 2)
r_i = r[:, np.newaxis]
r_j = r[np.newaxis, :]
d_ij = r_j - r_i
r = np.random(10000, 2)
r_i = r[:, np.newaxis]
r_j = r[np.newaxis, :]
numexpr
numexpr
numexpr
distance_matrix.py
distance_matrix_numpy distance_matrix_numexpr
numexpr
numexpr
pandas.Series
pandas.DataFrame pandas.Panel
pandas pd
True False
pd.Series
Series index
import pandas as pd
patients = [0, 1, 2, 3]
effective = [True, True, False, False]
pd.Series
pd.DataFrame
pd.DataFrame pd.Series
pd.DataFrame
columns = {
"sys_initial": [120, 126, 130, 115],
"dia_initial": [75, 85, 90, 87],
"sys_final": [115, 123, 130, 118],
"dia_final": [70, 82, 92, 87]
}
df = pd.DataFrame(columns, index=patients)
pd.DataFrame pd.Series
pd.DataFrame pd.Series
columns = {
"sys_initial": pd.Series([120, 126, 130, 115], index=patients),
"dia_initial": pd.Series([75, 85, 90, 87], index=patients),
"sys_final": pd.Series([115, 123, 130, 118], index=patients),
"dia_final": pd.Series([70, 82, 92, 87], index=patients)
}
df = pd.DataFrame(columns)
pd.DataFrame pd.Series
pd.Series.head pd.DataFrame.head
effective_series.head()
# Output:
# a True
# b True
# c False
# d False
# dtype: bool
df.head()
# Output:
# dia_final dia_initial sys_final sys_initial
# a 70 75 115 120
# b 82 85 123 126
# c 92 90 130 130
# d 87 87 118 115
pd.DataFrame pd.Series
pd.Panel pd.DataFrames
pd.Panel pd.Series pd.DataFrame
pd.Panel http://pandas.pydata.
org/pandas-docs/stable/dsintro.html#panel
pd.Series
pd.Series.loc
effective_series.loc["a"]
# Result:
# True
pd.Series.iloc
effective_series.iloc[0]
# Result:
# True
pd.Series.ix
pd.Series
effective_series.ix["a"] # By key
effective_series.ix[0] # By position
# Equivalent
effective_series["a"] # By key
effective_series[0] # By position
loc iloc
pd.DataFrame
pd.DataFrame.loc pd.DataFrame.iloc
df.loc["a"]
df.iloc[0]
# Result:
# dia_final 70
# dia_initial 75
# sys_final 115
# sys_initial 120
# Name: a, dtype: int64
pd.Series
loc iloc
df.iloc[0, 1] # is equivalent to
df.iloc[0].iloc[1]
pd.DataFrame ix
"sys_initial"
df.ix[0, "sys_initial"]
pd.DataFrame
iloc
pd.DataFrame.column
bool int
pd.Series.sort_index
pd.DataFrame
pd.Series
np.array
pd.Series pd.DataFrame
pd.Series
NaN
# Matching index
a = pd.Series([1, 2, 3], index=["a", "b", "c"])
b = pd.Series([4, 5, 6], index=["a", "b", "c"])
a + b
# Result:
# a 5
# b 7
# c 9
# dtype: int64
# Mismatching index
b = pd.Series([4, 5, 6], index=["a", "b", "d"])
# Result:
# a 5.0
# b 7.0
# c NaN
# d NaN
# dtype: float64
map apply applymap
pd.Series.map
pd.Series
superstar pd.Series
# Result:
# a *1*
# b *2*
# c *3*
# dtype: object
pd.DataFrame.applymap pd.Series.map
DataFrames
df.applymap(superstar)
# Result:
# dia_final dia_initial sys_final sys_initial
# a *70* *75* *115* *120*
# b *82* *85* *123* *126*
# c *92* *90* *130* *130*
# d *87* *87* *118* *115*
pd.DataFrame.apply
0 1
apply pd.Series
df.apply(superstar, axis=0)
# Result:
# dia_final *a 70nb 82nc 92nd 87nName: dia...
# dia_initial *a 75nb 85nc 90nd 87nName: dia...
# sys_final *a 115nb 123nc 130nd 118nName:...
# sys_initial *a 120nb 126nc 130nd 115nName:...
# dtype: object
df.apply(superstar, axis=1)
# Result:
# a *dia_final 70ndia_initial 75nsys_f...
# b *dia_final 82ndia_initial 85nsys_f...
# c *dia_final 92ndia_initial 90nsys_f...
# d *dia_final 87ndia_initial 87nsys_f...
# dtype: object
numexpr eval
df.eval("sys_final - sys_initial")
# Result:
# a -5
# b -3
# c 0
# d 3
# dtype: int64
pd.DataFrame.eval inplace=True
pd.DataFrame
drug_admst
columns = {
"sys_initial": [120, 126, 130, 115, 150, 117],
"dia_initial": [75, 85, 90, 87, 90, 74],
"sys_final": [115, 123, 130, 118, 130, 121],
"dia_final": [70, 82, 92, 87, 85, 74],
"drug_admst": [True, True, True, True, False, False]
}
df = pd.DataFrame(columns, index=patients)
drug_amst
pd.DataFrame.groupby DataFrameGroupBy
pd.DataFrame drug_admst
df.groupby('drug_admst')
for value, group in df.groupby('drug_admst'):
print("Value: {}".format(value))
print("Group DataFrame:")
print(group)
DataFrameGroupBy
hospitals = pd.DataFrame(
{ "name" : ["City 1", "City 2", "City 3"],
"address" : ["Address 1", "Address 2", "Address 3"],
"city": ["City 1", "City 2", "City 3"] },
index=["H1", "H2", "H3"])
hospital_id hospitals
hospital_dict = {
"H1": ("City 1", "Name 1", "Address 1"),
"H2": ("City 2", "Name 2", "Address 2"),
"H3": ("City 3", "Name 3", "Address 3")
}
cities = [hospital_dict[key][0]
for key in hospital_id]
hospital_id
pd.DataFrame.join
pd.DataFrame
numexpr
.pyx cython
Hello, World!
hello.pyx
def hello():
print('Hello, World!')
hello.c
/usr/include/python3.5/
distutils
sysconfig.get_python_inc
python -c "from distutils import sysconfig;
print(sysconfig.get_python_inc())"
hello.so
hello.pyx -3
hello.c
-I
distutils
setup.py .pyx
hello.pyx
setup.py
setup(
name='Hello',
ext_modules = cythonize('hello.pyx')
)
setup
cythonize setup
cythonize
distutils setup.py
build_ext
ext_modules --inplace hello.so
pyximport
pyximport.install()
.pyx
pyximport
pyximport
cythonmagic
load_ext
%load_ext cythonmagic
%%cython
hello_snippet
%%cython
def hello_snippet():
print("Hello, Cython!")
hello_snippet()
Hello, Cython!
cdef cdef
cdef
i
cdef int i
cdef
'hello' a
a = 'hello'
a 'hello'
1
a = 1
1 a
a int double
%%cython
cdef int i
i = 3.0
int
int
example
%%cython
def example():
cdef int i, j=0
for i in range(100):
j += 1
return j
example()
# Result:
# 100
def example_python():
j=0
for i in range(100):
j += 1
return j
%timeit example()
10000000 loops, best of 3: 25 ns per loop
%timeit example_python()
100000 loops, best of 3: 2.74 us per loop
struct enum
typedef
object
object
float int
cdef int a = 0
cdef double b
b = <double> a
max_python
def max_python(int a, int b):
return a if a > b else b
cdef
cdef
cpdef
cpdef
cdef
inline
max
cdef class
Point
double
struct
cdef class
Point
double float int
norm Point
Point
AttributeError
public
readonly
cpdef
cimport
max min
.pyx
max min
mathlib.pxd
.pyx mathlib.pyx
mathlib
distance.pyx
chebyshev
chebyshev max
mathlib.pxd cimport
double
&
printf libc.stdio
%%cython
cdef double a
from libc.stdio cimport printf
printf("%p", &a)
# Output:
# 0x7fc8bb611210
* *
*
cdef double a
cdef double *a_pointer
a_pointer = &a
a = 3.0
print(*a_pointer) # prints 3.0
double
5 2
arr
arr
%%cython
from libc.stdio cimport printf
cdef double arr[10]
printf("%pn", arr)
printf("%pn", &arr[0])
# Output
# 0x7ff6de204220
# 0x7ff6de204220
numpy
ndarray
cimport numpy
c_np
numpy
double
%%cython
import numpy as np
def numpy_bench_py():
py_arr = np.random.rand(1000)
cdef int i
for i in range(1000):
py_arr[i] += 1
ndarray
c_arr c_np.ndarray numpy
%%cython
import numpy as np
cimport numpy as c_np
def numpy_bench_c():
cdef c_np.ndarray[double, ndim=1] c_arr
c_arr = np.random.rand(1000)
cdef int i
for i in range(1000):
c_arr[i] += 1
timeit
%timeit numpy_bench_c()
100000 loops, best of 3: 11.5 us per loop
%timeit numpy_bench_py()
1000 loops, best of 3: 603 us per loop
cdef int[:] a
cdef double[:, :] b
bytes array.array
import numpy as np
Chapter 3
cdef int[:, :, :] a
arr[0, :, :] # Is a 2-dimensional memoryview
arr[0, 0, :] # Is a 1-dimensional memoryview
arr[0, 0, 0] # Is an int
import numpy as np
cdef double[:, :] b
cdef double[:] r
b = np.random.rand(10, 3)
r = np.zeros(3, dtype='float64')
ParticleSimulator.evolve
Chapter 3
evolve
evolve_numpy
for i in range(nsteps):
norm_i = np.sqrt((r_i ** 2).sum(axis=1))
r_i += d_i
for i, p in enumerate(self.particles):
p.x, p.y = r_i[i]
cevolve.pyx
c_evolve
# file: simul.py
def evolve_cython(self, dt):
timestep = 0.00001
nsteps = int(dt/timestep)
for i, p in enumerate(self.particles):
p.x, p.y = r_i[i]
# file: cevolve.pyx
import numpy as np
for i in range(nsteps):
norm_i = np.sqrt((r_i ** 2).sum(axis=1))
r_i += d_i
c_evolve r_i
double
int float32
cdef int i, j
cdef int nparticles = r_i.shape[0]
for i in range(nsteps):
for j in range(nparticles):
x = r_i[j, 0]
y = r_i[j, 1]
ang_speed = ang_speed_i[j]
norm = sqrt(x ** 2 + y ** 2)
vx = (-y)/norm
vy = x/norm
dx = timestep * ang_speed * vx
dy = timestep * ang_speed * vy
r_i[j, 0] += dx
r_i[j, 1] += dy
x y ang_speed norm vx vy dx dy
sqrt sqrt
math numpy
sqrt
libc.math
-a
-a
v_y = x/norm
0 ZeroDivisionError x =
r_i[j, 0]
http://docs.cython.org/src/reference/comp
ilation.html#compiler-directives
cython.boundscheck
cimport cython
@cython.boundscheck(False)
def myfunction():
# Code here
cython.boundscheck
with cython.boundscheck(False):
# Code here
# cython: boundscheck=False
-X
c_evolve boundscheck
cdivision ZeroDivisionError
cimport cython
@cython.boundscheck(False)
@cython.cdivision(True)
def c_evolve(double[:, :] r_i,
double[:] ang_speed_i,
double timestep,
int nsteps):
cProfile
cheb.py
import numpy as np
from distance import chebyshev
def benchmark():
a = np.random.rand(100, 2)
b = np.random.rand(100, 2)
for x1, y1 in a:
for x2, y2 in b:
chebyshev(x1, x2, y1, y2)
max min
profile=True mathlib.pyx
# cython: profile=True
%prun
import cheb
%prun cheb.benchmark()
max
benchmark
jupyter notebook
%load_ext cython
%%cython
cheb.py
%%cython
import numpy as np
cdef int chebyshev(int x1, int y1, int x2, int y2):
return max(abs(x1 - x2), abs(y1 - y2))
def c_benchmark():
a = np.random.rand(1000, 2)
b = np.random.rand(1000, 2)
for x1, y1 in a:
for x2, y2 in b:
chebyshev(x1, x2, y1, y2)
%%cython -a
-a
%prun %timeit
%prun
line_profiler Chapter 1
linetrace=True binding=True
CYTHON_TRACE=1
%%cython
import numpy as np
def c_benchmark():
a = np.random.rand(1000, 2)
b = np.random.rand(1000, 2)
for x1, y1 in a:
for x2, y2 in b:
chebyshev(x1, x2, y1, y2)
%lprun
cProfile
Chapter 4
Numba.ipynb
def sum_sq(a):
result = 0
N = len(a)
for i in range(N):
result += a[i]
return result
nb.jit
@nb.jit
def sum_sq(a):
...
nb.jit
a
py_func
import numpy as np
x = np.random.rand(10000)
# Original
%timeit sum_sq.py_func(x)
100 loops, best of 3: 6.11 ms per loop
# Numba
%timeit sum_sq(x)
100000 loops, best of 3: 11.7 µs per loop
%timeit (x**2).sum()
10000 loops, best of 3: 14.8 µs per loop
sum_sq
sum_sq
x_list = x.tolist()
%timeit sum_sq(x_list)
1000 loops, best of 3: 199 µs per loop
nb.jit
sum_sq
signatures
sum_sq
sum_sq.signatures
sum_sq.signatures
# Output:
# []
float64
float32
sum_sq.signatures
x = np.random.rand(1000).astype('float64')
sum_sq(x)
sum_sq.signatures
# Result:
# [(array(float64, 1d, C),)]
x = np.random.rand(1000).astype('float32')
sum_sq(x)
sum_sq.signatures
# Result:
# [(array(float64, 1d, C),), (array(float32, 1d, C),)]
nb.jit
nb.types
nb
[:]
float64
@nb.jit((nb.float64[:],))
def sum_sq(a):
x float32
TypeError
sum_sq(x.astype('float32'))
# TypeError: No matching definition for argument type(s)
array(float32, 1d, C)
float64 float64
float64(float64) [:]
sum_sq
@nb.jit("float64(float64[:])")
def sum_sq(a):
@nb.jit(["float64(float64[:])",
"float64(float32[:])"])
def sum_sq(a):
inspect_types
sum_sq
sum_sq.inspect_types()
N = len(a)
N = len(a)
a float64 LINE 4
len
float64 int64
@nb.jit
def concatenate(strings):
result = ''
for s in strings:
result += s
return result
concatenate(['hello', 'world'])
concatenate.signatures
# Output: [(reflected list(str),)]
concatenate.inspect_types()
concatenate.inspect_types()
result = ''
pyobject
x = ['hello'] * 1000
%timeit concatenate.py_func(x)
10000 loops, best of 3: 111 µs per loop
%timeit concatenate(x)
1000 loops, best of 3: 317 µs per loop
nopython=True
nb.jit
@nb.jit(nopython=True)
def concatenate(strings):
result = ''
for s in strings:
result += s
return result
concatenate(x)
# Exception:
# TypingError: Failed at nopython (nopython frontend)
ufuncs
ufunc Chapter 3
np.log ufunc
np.sum np.difference
np.vectorize
import numpy as np
np.vectorized
@np.vectorize
def cantor(a, b):
return int(0.5 * (a + b)*(a + b + 1) + b)
cantor(np.array([1, 2]), 2)
# Result:
# array([ 8, 12])
nb.vectorize np.vectorized
cantor_py
# Pure Python
%timeit cantor_py(x1, x2)
100 loops, best of 3: 6.06 ms per loop
# Numba
%timeit cantor(x1, x2)
100000 loops, best of 3: 15 µs per loop
# NumPy
%timeit (0.5 * (x1 + x2)*(x1 + x2 + 1) + x2).astype(int)
10000 loops, best of 3: 57.1 µs per loop
target="cpu" target="gpu"
nb.vectorize
gufunc
np.matmul
np.matmul
a = np.random.rand(3, 3)
b = np.random.rand(3, 3)
c = np.matmul(a, b)
c.shape
# Result:
# (3, 3)
ufunc
np.matmul
(3, 3) np.matmul
(3, 3)
a = np.random.rand(10, 3, 3)
b = np.random.rand(10, 3, 3)
c = np.matmul(a, b)
c.shape
# Output
# (10, 3, 3)
(3, 3) (10, 3, 3) np.matmul
(3, 3)
(10, 3, 3)
a = np.random.rand(10, 3, 3)
b = np.random.rand(3, 3) # Broadcasted to shape (10, 3, 3)
c = np.matmul(a, b)
c.shape
# Result:
# (10, 3, 3)
nb.guvectorize
gufunc gufunc
nb.guvectorize
euclidean
nb.guvectorize
a b float64[:]
float64[:]
(n)
()
out
n
k
(n, m)
euclidean
a = np.random.rand(2)
b = np.random.rand(2)
c = euclidean(a, b) # Shape: (1,)
a = np.random.rand(10, 2)
b = np.random.rand(10, 2)
c = euclidean(a, b) # Shape: (10,)
a = np.random.rand(10, 2)
b = np.random.rand(2)
c = euclidean(a, b) # Shape: (10,)
euclidean
euclidean
a = np.random.rand(10000, 2)
b = np.random.rand(10000, 2)
%timeit euclidean(a, b)
10000 loops, best of 3: 35.6 µs per loop
Node
Node
class Node:
def __init__(self, value):
self.next = None
self.value = value
Node
LinkedList
LinkedList
LinkedList.push_back
class LinkedList:
def __init__(self):
self.head = None
LinkedList.show
def show(self):
node = self.head
while node is not None:
print(node.value)
node = node.next
LinkedList
lst = LinkedList()
lst.push_front(1)
lst.push_front(2)
lst.push_front(3)
lst.show()
# Output:
# 3
# 2
# 1
sum_list
@nb.jit
def sum_list(lst):
result = 0
node = lst.head
while node is not None:
result += node.value
node = node.next
return result
sum_list nb.jit
lst = LinkedList()
[lst.push_front(i) for i in range(10000)]
%timeit sum_list.py_func(lst)
1000 loops, best of 3: 2.36 ms per loop
%timeit sum_list(lst)
100 loops, best of 3: 1.75 ms per loop
nb.jitclass
Node int64 value Node next nb.jitclass
nb.deferred_type()
Node node_type
nb.deferred_type()
nb.optional
node_type = nb.deferred_type()
node_spec = [
('next', nb.optional(node_type)),
('value', nb.int64)
]
@nb.jitclass(node_spec)
class Node:
# Body of Node is unchanged
node_type.define(Node.class_type.instance_type)
LinkedList
head nb.jitclass
ll_spec = [
('head', nb.optional(Node.class_type.instance_type))
]
@nb.jitclass(ll_spec)
class LinkedList:
# Body of LinkedList is unchanged
sum_list
LinkedList
lst = LinkedList()
[lst.push_front(i) for i in range(10000)]
%timeit sum_list(lst)
1000 loops, best of 3: 345 µs per loop
%timeit sum_list.py_func(lst)
100 loops, best of 3: 3.36 ms per loop
sum_list.py_func
ValueError
a = [[0, 1, 2],
[3, 4],
[5, 6, 7, 8]]
@nb.jit
def sum_sublists(a):
result = []
for sublist in a:
result.append(sum(sublist))
return result
sum_sublists(a)
# ValueError: cannot compute fingerprint of empty list
@nb.jit
def sum_sublists(a):
result = [0]
for sublist in a:
result.append(sum(sublist))
return result[1:]
with
try except
http://pypy.or
g/download.html
bin/pypy
python -V
numpy matplotlib
numpy matplotlib
Chapter
1
python -V
timeit
matplotlib.pyplot
export matplotlib
simul.py
timeit perf
nb.jitclass
asyncio
network_request
time.sleep
import time
def network_request(number):
time.sleep(1.0)
return {"success": True, "result": number ** 2}
fetch_square
network_request
def fetch_square(number):
response = network_request(number)
if response["success"]:
print("Result is: {}".format(response["result"]))
fetch_square(2)
# Output:
# Result is: 4
fetch_square
fetch_square(2)
fetch_square(3)
fetch_square(4)
# Output:
# Result is: 4
# Result is: 9
# Result is: 16
time.sleep
time.sleep threading.Timer
wait_and_print
def wait_and_print(msg):
time.sleep(1.0)
print(msg)
threading.Timer threading.Timer
Timer.start
import threading
def wait_and_print_async(msg):
def callback():
print(msg)
wait_and_print_async
threading.Timer
threading.Timer
wait_and_print
<wait...>
# Syncronous
wait_and_print("First call")
wait_and_print("Second call")
print("After call")
# Output:
# <wait...>
# First call
# <wait...>
# Second call
# After call
# Async
wait_and_print_async("First call async")
wait_and_print_async("Second call async")
print("After submission")
# Output:
# After submission
# <wait...>
# First call
# Second call
wait_and_print_async
"After submission"
network_request
network_request_async
network_request_async network_request_async
network_request_async
on_done
timer_done
timer.Timer on_done
def timer_done():
on_done({"success": True,
"result": number ** 2})
timer = threading.Timer(1.0, timer_done)
timer.start()
network_request_async timer.Timer
def on_done(result):
print(result)
network_request_async(2, on_done)
network_request_async(2, on_done)
network_request_async(3, on_done)
network_request_async(4, on_done)
print("After submission")
network_request_async fetch_square
fetch_square
on_done network_request_async
def fetch_square(number):
def on_done(response):
if response["success"]:
print("Result is: {}".format(response["result"]))
network_request_async(number, on_done)
concurrent.futures.Future Future
fut = Future()
# Result:
# <Future at 0x7f03e41599e8 state=pending>
Future.set_result
fut.set_result("Hello")
# Result:
# <Future at 0x7f03e41599e8 state=finished returned str>
fut.result()
# Result:
# "Hello"
Future
Future.result
Future.add_done_callback
Future
Future.result()
fut = Future()
fut.add_done_callback(lambda future: print(future.result(),
flush=True))
fut.set_result("Hello")
# Output:
# Hello
network_request_async
Future
on_done callback
Future.add_done_callback
Future.set_result threading.Timer
from concurrent.futures import Future
def network_request_async(number):
future = Future()
result = {"success": True, "result": number ** 2}
timer = threading.Timer(1.0, lambda: future.set_result(result))
timer.start()
return future
fut = network_request_async(2)
Future
Future.add_done_callback
fetch_square
def fetch_square(number):
fut = network_request_async(number)
def on_done_future(future):
response = future.result()
if response["success"]:
print("Result is: {}".format(response["result"]))
fut.add_done_callback(on_done_future)
threading.Timer
Timer Timer.done
True
class Timer:
def done(self):
return time.time() - self.start > self.timeout
Timer.done
timer = Timer(1.0)
while True:
if timer.done():
print("Timer is done!")
break
threading.Timer
Timer.on_timer_done
class Timer:
# ... previous code
def on_timer_done(self, callback):
self.callback = callback
on_timer_done
timer.callback
timer = Timer(1.0)
timer.on_timer_done(lambda: print("Timer is done!"))
while True:
if timer.done():
break
Timer
timers
timers = []
timer1 = Timer(1.0)
timer1.on_timer_done(lambda: print("First timer is done!"))
timer2 = Timer(2.0)
timer2.on_timer_done(lambda: print("Second timer is done!"))
timers.append(timer1)
timers.append(timer2)
while True:
for timer in timers:
if timer.done():
timer.callback()
timers.remove(timer)
# If no more timers are left, we exit the loop
if len(timers) == 0:
break
time.sleep
time.sleep
select
asyncio
asyncio
async await
asyncio asyncio
asyncio.get_event_loop()
loop.call_later
loop.stop
loop.run_forever
import asyncio
loop = asyncio.get_event_loop()
def callback():
print("Hello, asyncio")
loop.stop()
loop.call_later(1.0, callback)
loop.run_forever()
yield
range_generator
0 n
def range_generator(n):
i = 0
while i < n:
print("Generating value {}".format(i))
yield i
i += 1
range_generator
generator = range_generator(3)
generator
# Result:
# <generator object range_generator at 0x7f03e418ba40>
next
next(generator)
# Output:
# Generating value 0
next(generator)
# Output:
# Generating value 1
next yield
next
yield
yield
message = yield
send
def parrot():
while True:
message = yield
print("Parrot says: {}".format(message))
generator = parrot()
generator.send(None)
generator.send("Hello")
generator.send("World")
generator.send(None)
yield
parrot
asyncio yield
coro = hello()
coro
# Output:
# <coroutine object hello at 0x7f314846bd58>
hello
asyncio
next asyncio
run_until_complete
loop = asyncio.get_event_loop()
loop.run_until_complete(coro)
async def
asyncio
await
asyncio.sleep
loop.run_until_complete(wait_and_print("Hello"))
await
awaitable await
network_request time.sleep
asyncio.sleep
fetch_square
network_request
loop.run_until_complete
loop.run_until_complete(fetch_square(2))
loop.run_until_complete(fetch_square(3))
loop.run_until_complete(fetch_square(4))
run_until_complete
loop.run_forever
asyncio ensure_future
ensure_future
fetch_square
asyncio.ensure_future(fetch_square(2))
asyncio.ensure_future(fetch_square(3))
asyncio.ensure_future(fetch_square(4))
loop.run_forever()
# Hit Ctrl-C to stop the loop
asyncio.ensure_future
Task Future
asyncio
asyncio
Executor
ThreadPoolExecutor concurrent.futures
workers
max_workers
ThreadPoolExecutor
wait_and_return
submit
def wait_and_return(msg):
time.sleep(1)
return msg
executor.submit
asyncio loop.run_in_executor
executor.submit
loop.run_until_complete
loop.run_until_complete(fut)
# Result:
# 'Hello, executor'
requests
requests.get
import requests
loop.run_until_complete(fetch_ruls(['http://www.google.com',
'http://www.example.com',
'http://www.facebook.com']))
# Result
# []
fetch_url
asyncio
asyncio.ensure_future asyncio.gather
asyncio.gather
def fetch_urls(urls):
return asyncio.gather(*[loop.run_in_executor
(executor, requests.get, url)
for url in urls])
aiohttp
http://reactivex.io/
Observable.from_iterable
obs Observable.subscribe
obs.subscribe(print)
# Output:
# 0
# 1
# 2
# 3
__next__
next
iter next for
print("Next")
print(next(iterator))
print(next(iterator))
print("For loop")
for i in iterator:
print(i)
# Output:
# Next
# 1
# 2
# For loop
# 3
# 4
# 5
next
next
Observable.subscribe
on_next on_completed
obs = Observable.from_iter(range(4))
obs.subscribe(on_next=lambda x: print(on_next="Next item: {}"),
on_completed=lambda: print("No more data"))
# Output:
# Next element: 0
# Next element: 1
# Next element: 2
# Next element: 3
# No more data
take
take n
obs = Observable.from_iterable(range(100000))
obs2 = obs.take(4)
obs2.subscribe(print)
# Output:
# 0
# 1
# 2
# 3
map
map
(Observable.from_iterable(range(4))
.map(lambda x: x**2)
.subscribe(print))
# Output:
# 0
# 1
# 4
# 9
group_by
group_by
group_by
group_by
group_by
group_by lambda x: x % 2
0 1
obs = (Observable.from_range(range(4))
.group_by(lambda x: x % 2))
obs
obs.subscribe(print)
# <rx.linq.groupedobservable.GroupedObservable object at
0x7f0fba51f9e8>
# <rx.linq.groupedobservable.GroupedObservable object at
0x7f0fba51fa58>
key
obs.subscribe(lambda x: print("group key: ", x.key))
# Output:
# group key: 0
# group key: 1
obs.take(1).subscribe(lambda x: x.subscribe(print))
# Output:
# 0
# 2
group_by
merge_all concat_all
merge_all concat_all
concat_all
group_by merge_all
group_by
obs.merge_all().subscribe(print)
# Output
# 0
# 1
# 2
# 3
concat_all
make_replay
concat_all
def make_replay(a):
result = a.replay(None)
result.connect()
return result
obs.map(make_replay).concat_all().subscribe(print)
# Output
# 0
# 2
# 1
# 3
merge concat
Observable.from_iterable
Observable.interval period
obs
take
obs = Observable.interval(1000)
obs.take(4).subscribe(print)
# Output:
# 0
# 1
# 2
# 3
Observable.interval
time.time()
import time
start = time.time()
obs = Observable.interval(1000).map(lambda a:
(a, time.time() - start))
0
subscribe(print)
Observable.interval
start = time.time()
obs = Observable.interval(1000).map(lambda a:
(a, time.time() - start))
publish
ConnectableObservable
connect
publish connect
start = time.time()
obs = Observable.interval(1000).map(lambda a: (a, time.time() -
start))
obs.take(4).subscribe(lambda x: print("First subscriber:
{}".format(x)))
time.sleep(2)
obs.take(4).subscribe(lambda x: print("Second subscriber:
{}".format(x)))
# Output:
# First subscriber: (0, 1.0016899108886719)
# First subscriber: (1, 2.0027990341186523)
# First subscriber: (2, 3.003532648086548)
# Second subscriber: (2, 3.003532648086548)
# First subscriber: (3, 4.004265308380127)
# Second subscriber: (3, 4.004265308380127)
# Second subscriber: (4, 5.005320310592651)
# Second subscriber: (5, 6.005795240402222)
publish
connect connect
publish replay
publish replay
import time
start = time.time()
obs = Observable.interval(1000).map(lambda a: (a, time.time() -
start))
obs.take(4).subscribe(lambda x: print("First subscriber:
{}".format(x)))
time.sleep(2)
obs.take(4).subscribe(lambda x: print("Second subscriber:
{}".format(x)))
Subject
Subject
on_next
s = Subject()
s.subscribe(lambda a: print("Subject emitted value: {}".format(x))
s.on_next(1)
# Subject emitted value: 1
s.on_next(2)
# Subject emitted value: 2
Subject
cpu_monitor.py
psutil
psutil.cpu_percent
import psutil
psutil.cpu_percent()
# Result: 9.7
Observable.interval
map
cpu_data = (Observable
.interval(100) # Each 100 milliseconds
.map(lambda x: psutil.cpu_percent())
.publish())
cpu_data.connect() # Start producing data
cpu_data.take(4).subscribe(print)
# Output:
# 12.5
# 5.6
# 4.5
# 9.6
matplotlib
cpu_data
monitor_cpu
cpu_data
buffer_with_count
npoints
1
import numpy as np
from matplotlib import pyplot as plt
def monitor_cpu(npoints):
lines, = plt.plot([], [])
plt.xlim(0, npoints)
plt.ylim(0, 100) # 0 to 100 percent
def update_plot(cpu_readings):
lines.set_xdata(np.arange(npoints))
lines.set_ydata(np.array(cpu_readings))
plt.draw()
plt.show()
buffer_with_count map
True False
alertpoints = 4
high_cpu = (cpu_data
.buffer_with_count(alertpoints, 1)
.map(lambda readings: ))
high_cpu matplotlib
asyncio
asyncio
multiprocessing
ProcessPoolExecutor
multiprocessing
multiprocessing.Process __init__
Process.run Process
id
import multiprocessing
import time
class Process(multiprocessing.Process):
def __init__(self, id):
super(Process, self).__init__()
self.id = id
def run(self):
time.sleep(1)
print("I'm the process with id: {}".format(self.id))
Process
Process.start Process.run
Process.start Process.run
if __name__ == '__main__':
p = Process(0)
p.start()
Process.start
p
Process.join
if __name__ == '__main__':
p = Process(0)
p.start()
p.join()
if __name__ == '__main__':
processes = Process(1), Process(2), Process(3), Process(4)
[p.start() for p in processes]
multiprocessing
multiprocessing.Pool
multiprocessing.Pool
apply apply_async map map_async
Pool.map
map
multiprocessing.Pool
multiprocessing.Pool
pool = multiprocessing.Pool()
pool = multiprocessing.Pool(processes=4)
pool.map
Pool.map
def square(x):
return x * x
inputs = [0, 1, 2, 3, 4]
outputs = pool.map(square, inputs)
Pool.map_async Pool.map AsyncResult
Pool.map
map_async
AsyncResult
AsyncResult.get
Pool.apply_async
AsyncResult
map apply_async
Executor concurrent.futures
Executor ThreadPoolExecutor
ProcessPoolExecutor
ProcessPoolExecutor
multiprocessing.Pool ProcessPoolExecutor
ThreadPoolExecutor max_workers
max_workers
ProcessPoolExecutor submit map
submit Future
Pool.map
executor = ProcessPoolExecutor(max_workers=4)
fut = executor.submit(square, 2)
# Result:
# <Future at 0x7f5b5c030940 state=running>
Future
concurrent.futures.wait concurrent.futures.as_completed
wait future
Future.result as_completed
fut1 = executor.submit(square, 2)
fut2 = executor.submit(square, 3)
wait([fut1, fut2])
# Then you can extract the results using fut1.result() and
fut2.result()
asyncio.run_in_executor
asyncio
hits/total = area_circle/area_square = pi/4
pi = 4 * hits/total
import random
samples = 1000000
hits = 0
for i in range(samples):
x = random.uniform(-1.0, 1.0)
y = random.uniform(-1.0, 1.0)
pi = 4.0 * hits/samples
sample
1
0 sample
sample apply_async
def sample():
x = random.uniform(-1.0, 1.0)
y = random.uniform(-1.0, 1.0)
pool = multiprocessing.Pool()
results_async = [pool.apply_async(sample) for i in range(samples)]
hits = sum(r.get() for r in results_async)
pi_serial pi_apply_async
pi.py
sample_multiple
def sample_multiple(samples_partial):
return sum(sample() for i in range(samples_partial))
n_tasks = 10
chunk_size = samples/n_tasks
pool = multiprocessing.Pool()
results_async = [pool.apply_async(sample_multiple, chunk_size)
for i in range(n_tasks)]
hits = sum(r.get() for r in results_async)
pi_apply_async_chunked
user real
multiprocessing
multiprocessing.Value i d f
value
shared_variable = multiprocessing.Value('f')
shared_variable.value = 0
class Process(multiprocessing.Process):
def run(self):
for i in range(1000):
self.counter.value += 1
def main():
counter = multiprocessing.Value('i', lock=True)
counter.value = 0
shared.py
counter
0
1 1 2
0 1
multiprocessing.Lock
acquire release
lock = multiprocessing.Lock()
class Process(multiprocessing.Process):
def run(self):
for i in range(1000):
with lock: # acquire the lock
self.counter.value += 1
# release the lock
multiprocessing
http
://docs.python.org/3/library/multiprocessing.html
cython.parallel
prange
hello_parallel.pyx
square_serial
square_serial
import numpy as np
for i in range(size):
out[i] = inp[i]*inp[i]
return out_np
range prange prange
nogil
with nogil:
for i in prange(size):
out[i] = inp[i]*inp[i]
nogil=True prange
nogil
prange
prange
with gil
setup.py
-fopenmp
distutils.extension.Extension distutils cythonize
setup.py
hello_parallel = Extension('hello_parallel',
['hello_parallel.pyx'],
extra_compile_args=['-fopenmp'],
extra_link_args=['-fopenmp'])
setup(
name='Hello',
ext_modules = cythonize(['cevolve.pyx', hello_parallel]),
)
prange ParticleSimulator
c_evolve cevolve.pyx
Chapter 4
# cdef declarations
for i in range(nsteps):
for j in range(nparticles):
# loop body
for j in range(nparticles):
for i in range(nsteps):
# loop body
range prange
nogil
prange
numexpr
a
a_sq T.scalar
**
import theano.tensor as T
import theano as th
a = T.scalar('a')
a_sq = a ** 2
print(a_sq)
# Output:
# Elemwise{pow,no_inplace}.0
th.function
a_sq
th.function
compute_square(2)
4.0
compute_square
float64
dtype a
a.dtype
# Result:
# float64
T.vector
a = T.vector('a')
b = T.vector('b')
ab_sq = a**2 + b**2
compute_square = th.function([a, b], ab_sq)
pi
x y
hit_test
x = T.vector('x')
y = T.vector('y')
hit_test = x ** 2 + y ** 2 < 1
True hit_test
hits = hit_test.sum()
total = x.shape[0]
pi_est = 4 * hits/total
th.function
timeit
timeit.timeit calculate_pi
import timeit
res = timeit.timeit("calculate_pi(x_val, y_val)",
"from __main__ import x_val, y_val, calculate_pi", number=100000)
print(res)
# Output:
# 10.905971487998613
theano.config
openmp_elemwise_minsize
openmp
OMP_NUM_THREADS
test_theano.py
# File: test_theano.py
import numpy as np
import theano.tensor as T
import theano as th
th.config.openmp_elemwise_minsize = 1000
th.config.openmp = True
x = T.vector('x')
y = T.vector('y')
hit_test = x ** 2 + y ** 2 <= 1
hits = hit_test.sum()
misses = x.shape[0]
pi_est = 4 * hits/misses
import timeit
res = timeit.timeit("calculate_pi(x_val, y_val)",
"from __main__ import x_val, y_val,
calculate_pi", number=100000)
print(res)
OMP_NUM_THREADS
$ python test_theano.py
10.905971487998613
$ python test_theano.py
7.538279129999864
$ python test_theano.py
9.405846934998408
$ python test_theano.py
14.634153957000308
sum
hit_tests
test_theano.py
# Older version
# hits = hit_test.sum()
hits =
$ python test_theano.py
5.822126664999814
$ python test_theano.py
5.697357518001809
$ python test_theano.py
5.636914656002773
$ python test_theano.py
5.764030176000233
profile=True th.function
timeit
summary
calculate_pi.profile.summary()
profile=True
OMP_NUM_THREADS
hit_tests
config.profile
calculate_pi.profile.summary()
Function profiling
==================
Message: test_theano.py:15
Class
---
<% time> <sum %> <apply time> <time per call> <type> <#call> <#apply>
<Class name>
.... timing info by class
<% time> <sum %> <apply time> <time per call> <type> <#call> <#apply> <Op
name>
Apply
------
<% time> <sum %> <apply time> <time per call> <#call> <id> <Apply name>
... timing info by apply
tf.placeholder
import tensorflow as tf
a = tf.placeholder('float64')
tf.Session
tf.Session
Session.run
a = tf.placeholder('float64')
b = tf.placeholder('float64')
ab_sq = a**2 + b**2
x y
hit_tests tf.reduce_sum
Session inter_op_parallelism_threads
intra_op_parallelism_threads
Session
Session
test_tensorflow.py
sys.argv[1]
import tensorflow as tf
import numpy as np
import time
import sys
NUM_THREADS = int(sys.argv[1])
samples = 30000
x = tf.placeholder('float64', name='x')
y = tf.placeholder('float64', name='y')
with tf.Session
(config=tf.ConfigProto
(inter_op_parallelism_threads=NUM_THREADS,
intra_op_parallelism_threads=NUM_THREADS)) as sess:
start = time.time()
for i in range(10000):
sess.run(hits, {x: x_data, y: y_data})
print(time.time() - start)
NUM_THREADS
13.059704780578613
11.938535928726196
12.783955574035645
12.158143043518066
https://aws.amaz
on.com/ec2
T.matrix
T.dot
N = 5000
A = T.matrix('A')
B = T.matrix('B')
start = time.time()
f(A_data, B_data)
config.device=gpu
THEANO_FLAGS
test_theano_matmul.py
device=cpu
tf.device
/cpu:0 /gpu:0
tf.matmul
import tensorflow as tf
import time
import numpy as np
N = 5000
A_data = np.random.rand(N, N)
B_data = np.random.rand(N, N)
# Creates a graph.
A = tf.placeholder('float32')
B = tf.placeholder('float32')
C = tf.matmul(A, B)
test_tensorflow_matmul.py tf.device
import numba as nb
import math
@nb.vectorize(target='cpu')
def expon_cpu(x, y):
return math.exp(x) + math.exp(y)
expon_cpu
target='cuda'
expon_gpu
import numpy as np
import time
N = 1000000
niter = 100
a = np.random.rand(N).astype('float32')
b = np.random.rand(N).astype('float32')
# Trigger compilation
expon_cpu(a, b)
expon_gpu(a, b)
# Timing
start = time.time()
for i in range(niter):
expon_cpu(a, b)
print("CPU:", time.time() - start)
start = time.time()
for i in range(niter):
expon_gpu(a, b)
print("GPU:", time.time() - start)
#
#
#
mpi4py
https://research.google.com/archive/mapreduce.html
map reduce
conda
a b
2
dsk = {
"a" : 2,
"b" : 2,
}
dsk
sum result
dsk = {
"a" : 2,
"b" : 2,
"result": (lambda x, y: x + y, "a", "b")
}
lambda
operator.add
dask.get
import dask
dask.get
dask.array
da
da.from_array
da.from_array da.array
30 10
import numpy as np
import dask.array as da
a = np.random.rand(30)
a_da dask
'array-original-4c76'
a_da.dask
dask.array.core.getarray
dict(a_da.dask)
# Result
{('array-4c76', 0): (<function dask.array.core.getarray>,
'array-original-4c76',
(slice(0, 10, None),)),
('array-4c76', 2): (<function dask.array.core.getarray>,
'array-original-4c76',
(slice(20, 30, None),)),
('array-4c76', 1): (<function dask.array.core.getarray>,
'array-original-4c76',
(slice(10, 20, None),)),
'array-original-4c76': array([ ... ])
}
a_da
da.array
N = 10000
chunksize = 1000
x_data = np.random.uniform(-1, 1, N)
y_data = np.random.uniform(-1, 1, N)
x = da.from_array(x_data, chunks=chunksize)
y = da.from_array(y_data, chunks=chunksize)
hit_test = x ** 2 + y ** 2 < 1
hits = hit_test.sum()
pi = 4 * hits / N
compute
get da.array
pi.visualize()
dask.bag.Bag
dask.dataframe.DataFrame
pandas.DataFrame
Bag Bag
from_sequence
npartitions Bag
Bag
0 99
str.split concat
1
foldby
foldby
Bag foldby
foldby
key
binop total x total
binop
initial binop
combine
initial_combine combine
(collection
.map(str.split)
.concat()
.map(lambda x: {"word": x, "count": 1})
.compute())
# Output:
# [('dog', 1), ('cat', 1), ('sat', 2), ('on', 2), ('mat', 2), ('the',
4)]
Bag
dask.dataframe.DataFrame DataFrame
CSV Bag
da.array DataFrame
pandas.DataFrame
DataFrame
Bag Bag DataFrame
to_dataframe to_dataframe
DataFrame words
DataFrame pandas.DataFrame
value_counts
compute
df.words.value_counts().compute()
# Result:
# the 4
# sat 2
# on 2
# mat 2
# dog 1
# cat 1
# Name: words, dtype: int64
pd.Series
value_counts
pd.Series value_counts_aggregate
array DataFrame
conda $ conda
install distributed pip $ pip
install distributed
Client
client = Client()
# Result:
# <Client: scheduler='tcp://127.0.0.1:46472' processes=4 cores=4>
Client
ThreadPoolExecutor
nogil
Client
Client.map Client.submit
Client.map Client.submit
Client
Future
def square(x):
return x ** 2
fut = client.submit(square, 2)
# Result:
# <Future: status: pending, key:
square-05236e00d545104559e0cd20f94cd8ab>
client.map(square)
futs = client.map(square, [0, 1, 2, 3, 4])
# Result:
# [<Future: status: pending, key: square-
d043f00c1427622a694f518348870a2f>,
# <Future: status: pending, key:
square-9352eac1fb1f6659e8442ca4838b6f8d>,
# <Future: status: finished, type: int, key:
# square-05236e00d545104559e0cd20f94cd8ab>,
# <Future: status: pending, key:
# square-c89f4c21ae6004ce0fe5206f1a8d619d>,
# <Future: status: pending, key:
# square-a66f1c13e2a46762b092a4f2922e9db9>]
TheadPoolExecutor ProcessPoolExecutor
client.submit square(2)
client.map square(2)
Future
Future Client.gather
client.gather(futs)
# Result:
# [0, 1, 4, 9, 16]
Client
client.get
pi.compute
pi.compute(get=client.get)
dask-scheduler
dask-worker dask-
scheduler
dask-worker
Client
https://www.docker.com/
Dockerfile
22b9dbc2767c260e525dcbc562b84a399a7f338fe1c06418cbe6b351c998e239
-p
http://127.0.0.1:8888
import pyspark
sc = pyspark.SparkContext('local[*]')
rdd = sc.parallelize(range(1000))
rdd.first()
# Result:
# 0
SparkContext
SparkContext
http://127.0.0.1:4040
SparkContext
Py4J
SparkContext.parallelize
0 1000
rdd = sc.parallelize(range(1000))
# Result:
# PythonRDD[3] at RDD at PythonRDD.scala:48
rdd
parallelize
rdd = sc.parallelize(range(1000), 2)
rdd.getNumPartitions() # This function will return the number of
partitions
# Result:
# 2
Chapter 6
map
map
map
collect
take
square_rdd.collect()
# Result:
# [0, 1, ... ]
square_rdd.take(10)
# Result:
# [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
parallelize zip
zip
import numpy as np
N = 10000
x = np.random.uniform(-1, 1, N)
y = np.random.uniform(-1, 1, N)
rdd_x = sc.parallelize(x)
rdd_y = sc.parallelize(y)
hit_test.sum
pi
user
timestamp
users
users
import datetime
# We generate 20 users
n_users = 20
users = [uuid4() for i in range(n_users)]
start = datetime.datetime(2017, 1, 1)
end = datetime.datetime(2017, 1, 7)
entries = []
N = 10000
for i in range(N):
entries.append({
'user': choice(users),
'timestamp': random_time(start, end)
})
groupBy
groupBy
(key, group)
groupBy
first
entries_rdd = sc.parallelize(entries)
entries_rdd.groupBy(lambda x: x['user']).first()
# Result:
# (UUID('0604aab5-c7ba-4d5b-b1e0-16091052fb11'),
# <pyspark.resultiterable.ResultIterable at 0x7faced4cd0b8>)
groupBy ResultIterable
ResultIterable
(entries_rdd
.groupBy(lambda x: x['user'])
.take(5))
# Result:
# [(UUID('0604aab5-c7ba-4d5b-b1e0-16091052fb11'), 536),
# (UUID('d72c81c1-83f9-4b3c-a21a-788736c9b2ea'), 504),
# (UUID('e2e125fa-8984-4a9a-9ca1-b0620b113cdb'), 498),
# (UUID('b90acaf9-f279-430d-854f-5df74432dd52'), 561),
# (UUID('00d7be53-22c3-43cf-ace7-974689e9d54b'), 466)]
groupBy
(key, value)
mapValues
map(lambda kv: (kv[0], len(kv[1])))
mapValues(len)
reduceByKey
reduceByKey
reduceByKey
rdd = sc.parallelize([("a", 1), ("b", 2), ("a", 3), ("b", 4), ("c",
5)])
rdd.reduceByKey(lambda a, b: a + b).collect()
# Result:
# [('c', 5), ('b', 6), ('a', 4)]
reduceByKey groupBy
foldby
reduceByKey
(entries_rdd
.map(lambda x: (x['user'], 1))
.reduceByKey(lambda a, b: a + b)
.take(3))
# Result:
# [(UUID('0604aab5-c7ba-4d5b-b1e0-16091052fb11'), 536),
# (UUID('d72c81c1-83f9-4b3c-a21a-788736c9b2ea'), 504),
# (UUID('e2e125fa-8984-4a9a-9ca1-b0620b113cdb'), 498)]
reduceByKey
sortByKey
(entries_rdd
.map(lambda x: (x['timestamp'].date(), 1))
.reduceByKey(lambda a, b: a + b)
.sortByKey()
.collect())
# Result:
# [(datetime.date(2017, 1, 1), 1685),
# (datetime.date(2017, 1, 2), 1625),
# (datetime.date(2017, 1, 3), 1663),
# (datetime.date(2017, 1, 4), 1643),
# (datetime.date(2017, 1, 5), 1731),
# (datetime.date(2017, 1, 6), 1653)]
pyspark.sql
spark.sql.DataFrame
SparkSession
SparkSession DataFrame
createDataFrame createDataFrame
pandas.DataFrame
spark.sql.DataFrame
rows Row Row
pd.DataFrame x y
df = spark.createDataFrame(rows)
df.show(5)
# Output:
# +-------------------+--------------------+
# | x| y|
# +-------------------+--------------------+
# |0.18432163061239137| 0.632310101419016|
# | 0.8159145525577987| -0.9578448778029829|
# |-0.6565050226033042| 0.4644773453129496|
# |-0.1566191476553318|-0.11542211978216432|
# | 0.7536730082381564| 0.26953055476074717|
# +-------------------+--------------------+
# only showing top 5 rows
spark.sql.DataFrame
selectExpr
x y pow
result = df.selectExpr('
')
result.first()
# Result:
# Row(pi=3.13976)
https://cwiki.apache.org/confluence/display/Hive/LanguageManua
l
GROUP BY
groupBy
0
mpi4py
COMM_WORLD
Get_rank
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
print("This is process", rank)
mpi_example.py
This is process 0
mpiexec -n
This is process 0
This is process 2
This is process 1
This is process 3
N / n_procs
n_procs
Get_size
hits_counts
reduce
root
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
import numpy as np
N = 10000
n_procs = comm.Get_size()
# Create an array
x_part = np.random.uniform(-1, 1, int(N/n_procs))
y_part = np.random.uniform(-1, 1, int(N/n_procs))
if rank == 0:
print("Total hits:", total_counts)
print("Final result:", 4 * total_counts/N)
mpi_pi.py
mpiexec
reduce
Chapter 6
Chapter 2
scipy
scikit-learn
cffi h
ttps://cffi.readthedocs.io/en/latest/
multiprocessing.Pool ProcessPoolExecutor
nogil
mpi4py
array DataFrame
README.md
setup.py
myapp
myapp/
README.md
LICENSE
setup.py
myapp/
__init__.py
module1.py
cmodule1.pyx
module2/
__init__.py
src/
module.c
module.h
tests/
__init__.py
test_module1.py
test_module2.py
benchmarks/
__init__.py
test_module1.py
test_module2.py
docs/
tools/
README.md
LICENSE
setuptools setup.py
setup.py
myapp
cmodule1.pyx
.c .h src/
myapp
tests/
unittest pytest
tests/ myapp
benchmarks
docs/
sphinx tools/
Y X
venv
venv
setup.py pip
scipy
scikit-learn
venv
venv
pip
conda
conda create
-n -n --name
3.5
source activate
which
myenv conda
env remove
conda
conda-forge
conda-forge -c --channel
conda search
scipy
anaconda-client
anaconda
chemview
-c
Chapter 8
https://www.docke
r.com/
docker run
docker run
-i
-t <image name>
ubuntu:16.04
/bin/bash
exit
-p -d
pyspark -d
-p <host_port>:<guest_port>
docker ps
-a
docker ps
585f53e77ce9 pensive_hamilton
docker start
docker attach
docker run
docker run
https://hub.
docker.com/
docker commit
FROM
jupyter/scipy-notebook
https://hub.docker.com/r/jupyter/scipy-notebook/
RUN ENV
openjdk-7-jre-headless
USER
jupyter/scipy-notebook
Jupyter Project <jupyter@googlegroups.com>
root
# Spark dependencies
APACHE_SPARK_VERSION 2.0.2
apt-get -y update &&
apt-get install -y --no-install-recommends
openjdk-7-jre-headless &&
apt-get clean &&
rm -rf /var/lib/apt/lists/*
cd /tmp &&
wget -q http://d3kbcqa49mib13.cloudfront.net/spark-
${APACHE_SPARK_VERSION}-bin-hadoop2.6.tgz &&
echo "ca39ac3edd216a4d568b316c3af00199
b77a52d05ecf4f9698da2bae37be998a
*spark-${APACHE_SPARK_VERSION}-bin-hadoop2.6.tgz" |
sha256sum -c - &&
tar xzf spark-${APACHE_SPARK_VERSION}
-bin-hadoop2.6.tgz -C /usr/local &&
rm spark-${APACHE_SPARK_VERSION}-bin-hadoop2.6.tgz
cd /usr/local && ln -s spark-${APACHE_SPARK_VERSION}
-bin-hadoop2.6 spark
$NB_USER
-t
pyspark
jupyter/scipy-notebook
pyspark
https://travis-ci.org/
.travis.yml
https://travis-ci.org/
.travis.yml
python install
pip
script
python
- "2.7"
- "3.5"
- sudo apt-get update
- if [[ "$TRAVIS_PYTHON_VERSION" == "2.7" ]]; then
wget https://repo.continuum.io/miniconda/
Miniconda2-latest-Linux-x86_64.sh -O miniconda.sh;
else
wget https://repo.continuum.io/miniconda/
Miniconda3-latest-Linux-x86_64.sh -O miniconda.sh;
fi
- bash miniconda.sh -b -p $HOME/miniconda
- export PATH="$HOME/miniconda/bin:$PATH"
- hash -r
- conda config --set always_yes yes --set changeps1 no
- conda update -q conda
pytest tests/
pytest benchmarks/
.travis.yml
22 23 24
81 64 65 66
234
37 245
240 31
12
105
95 96 97
95 96 97
59 60 61 62 63
49
57 58 59
139 140
122
38
89
95
17
136
17
147
234
205
152
10
148 150 151
92
185
8 29 30
162
188 208
53
7 192
16 17 18 172
19 21 9
14 15 137 138 139
200
208 238 239 240
238 241
39 245
40
148
152 150 151
22
243 244
22 23 24
171 217
16 243
107 245
49
86 88 89
233 169
101 103 104 105 144 145 146
105 107 175
109 111
91
205 142 143 144
208
123 124
212 213 214 125
214 216 53
232
203 171
204 205 201
37
27
79 27
81
81 172
83 84 194 196 197
79 80 24
81 81
76 77 78
94 95
201
39
40 41 42
41
204 205
42 43
31
46
199 200
177
170
163
200
16
49
136 137 32
42 43 32
76 77 78 100
43 227
17 18
238 16
16
16
232
245 177
126 177 179
52
83 84 227 228
119
24
25 26 27 152
68
172 133
27 115
27 28 123 124 125
38 129
115 118 119 120
179 180 121
50 117
121 122 123
115 117
72 73
24
98 99
17
79 80
57
59 60 61 62 63
200 201 202
57 58 59
200
67
67
68 71
216
133
119 121
154 156 19
172
21
72 73 19 21
122
24
74
79
74 75 76
154
182 183 184
161
164 166
8 9 10 11 12 13
161
101 103 104 105
154 156
157 158 160
68 71
132
154
203
218
76
220 222 224
227
235 236
232 233
233 234 235
173 212
245
185 47
192 48
14 117
99 100 101
19 21
14 15
121 122 123
185
187 188
190 191 192 89
187 238
240
171
172
212
19