Skip to content

Commit dc19418

Browse files
committed
MOTOR-86 Can't return values from internal coros.
Consequences of PEP 479. Motor not only supports users' coroutines, it uses coroutines to implement of some of its own features, like client.open() and gridfs.put(). Some of those coroutines return values. In Tornado and Python 2.6, Motor's internal coros could return values with "raise gen.Return(value)", and in Python 2.7 could alternatively use "raise StopIteration(value)". The latter also works in Python 3.3+ and asyncio, so this worked everywhere: @motor_coroutine def f(): raise framework.return_value(value) ... where framework.return_value is gen.Return or StopIteration. In Python 3.5 with asyncio, however, raising StopIteration to return from a coroutine is prohibited (PEP 492 says, "PEP 479 is enabled by default for coroutines"). Only "return value" works. It's possible there's a workaround for now, but it will break in Python 3.7 (PEP 479 transition plan: "Python 3.7: Enable new semantics everywhere"). For the future's sake, stop returning values from Motor's internal coroutes and use callbacks and explicit Futures to implement returns. Still TODO: reimplement parallel_collection_scan.
1 parent 5ed98b6 commit dc19418

File tree

7 files changed

+222
-110
lines changed

7 files changed

+222
-110
lines changed

Diff for: motor/core.py

+49-28
Original file line numberDiff line numberDiff line change
@@ -522,8 +522,7 @@ def __init__(self, *args, **kwargs):
522522
# 'MotorClient' that create_class_with_framework created.
523523
super(self.__class__, self).__init__(io_loop, *args, **kwargs)
524524

525-
@motor_coroutine
526-
def open(self):
525+
def open(self, callback=None):
527526
"""Connect to the server.
528527
529528
Takes an optional callback, or returns a Future that resolves to
@@ -548,8 +547,10 @@ def open(self):
548547
:class:`MotorClient` now opens itself on demand, calling ``open``
549548
explicitly is now optional.
550549
"""
551-
yield self._framework.yieldable(self._ensure_connected())
552-
self._framework.return_value(self)
550+
return self._framework.future_or_callback(self._ensure_connected(),
551+
callback,
552+
self.get_io_loop(),
553+
self)
553554

554555
def _get_member(self):
555556
# TODO: expose the PyMongo Member, or otherwise avoid this.
@@ -600,8 +601,7 @@ def __init__(self, *args, **kwargs):
600601
# 'MotorClient' that create_class_with_framework created.
601602
super(self.__class__, self).__init__(io_loop, *args, **kwargs)
602603

603-
@motor_coroutine
604-
def open(self):
604+
def open(self, callback=None):
605605
"""Connect to the server.
606606
607607
Takes an optional callback, or returns a Future that resolves to
@@ -626,11 +626,10 @@ def open(self):
626626
:class:`MotorReplicaSetClient` now opens itself on demand, calling
627627
``open`` explicitly is now optional.
628628
"""
629-
yield self._framework.yieldable(self._ensure_connected(True))
630-
primary = self._get_member()
631-
if not primary:
632-
raise pymongo.errors.AutoReconnect('no primary is available')
633-
self._framework.return_value(self)
629+
return self._framework.future_or_callback(self._ensure_connected(),
630+
callback,
631+
self.get_io_loop(),
632+
self)
634633

635634
def _get_member(self):
636635
# TODO: expose the PyMongo RSC members, or otherwise avoid this.
@@ -1218,18 +1217,18 @@ def fetch_next(self):
12181217
12191218
.. _`large batches`: http://docs.mongodb.org/manual/core/read-operations/#cursor-behaviors
12201219
"""
1221-
future = self._framework.get_future(self.get_io_loop())
1222-
12231220
if not self._buffer_size() and self.alive:
12241221
# Return the Future, which resolves to number of docs fetched or 0.
12251222
return self._get_more()
12261223
elif self._buffer_size():
1224+
future = self._framework.get_future(self.get_io_loop())
12271225
future.set_result(True)
12281226
return future
12291227
else:
12301228
# Dead
1229+
future = self._framework.get_future(self.get_io_loop())
12311230
future.set_result(False)
1232-
return future
1231+
return future
12331232

12341233
def next_object(self):
12351234
"""Get a document from the most recently fetched batch, or ``None``.
@@ -1320,8 +1319,7 @@ def _each_got_more(self, callback, future):
13201319
self.get_io_loop(),
13211320
functools.partial(callback, None, None))
13221321

1323-
@motor_coroutine
1324-
def to_list(self, length):
1322+
def to_list(self, length, callback=None):
13251323
"""Get a list of documents.
13261324
13271325
.. testsetup:: to_list
@@ -1370,24 +1368,47 @@ def to_list(self, length):
13701368
raise pymongo.errors.InvalidOperation(
13711369
"Can't call to_list on tailable cursor")
13721370

1373-
the_list = []
1374-
collection = self.collection
1375-
fix_outgoing = collection.database.delegate._fix_outgoing
1371+
future = self._framework.get_future(self.get_io_loop())
1372+
1373+
# Run future_or_callback's type checking before we change anything.
1374+
retval = self._framework.future_or_callback(future,
1375+
callback,
1376+
self.get_io_loop())
13761377

13771378
self.started = True
1378-
while True:
1379-
yield self._framework.yieldable(self._refresh())
1380-
while (self._buffer_size() > 0 and
1381-
(length is None or len(the_list) < length)):
1379+
the_list = []
1380+
self._refresh(callback=functools.partial(self._to_list,
1381+
length,
1382+
the_list,
1383+
future))
13821384

1383-
doc = self._data().popleft()
1384-
the_list.append(fix_outgoing(doc, collection))
1385+
return retval
1386+
1387+
def _to_list(self, length, the_list, future, result, error):
1388+
if error:
1389+
# TODO: lost exc_info
1390+
future.set_exception(error)
1391+
else:
1392+
collection = self.collection
1393+
fix_outgoing = collection.database.delegate._fix_outgoing
1394+
1395+
if length is None:
1396+
n = result
1397+
else:
1398+
n = min(length, result)
1399+
1400+
for _ in range(n):
1401+
the_list.append(fix_outgoing(self._data().popleft(),
1402+
collection))
13851403

13861404
reached_length = (length is not None and len(the_list) >= length)
13871405
if reached_length or not self.alive:
1388-
break
1389-
1390-
self._framework.return_value(the_list)
1406+
future.set_result(the_list)
1407+
else:
1408+
self._refresh(callback=functools.partial(self._to_list,
1409+
length,
1410+
the_list,
1411+
future))
13911412

13921413
def get_io_loop(self):
13931414
return self.collection.get_io_loop()

Diff for: motor/frameworks/asyncio.py

+44-5
Original file line numberDiff line numberDiff line change
@@ -39,15 +39,34 @@ def check_event_loop(loop):
3939
"not %r" % loop)
4040

4141

42-
def return_value(value):
43-
# In Python 3.3, StopIteration can accept a value.
44-
raise StopIteration(value)
45-
46-
4742
def get_future(loop):
4843
return asyncio.Future(loop=loop)
4944

5045

46+
_DEFAULT = object()
47+
48+
49+
def future_or_callback(future, callback, loop, return_value=_DEFAULT):
50+
if callback:
51+
raise NotImplementedError("Motor with asyncio prohibits callbacks")
52+
53+
if return_value is _DEFAULT:
54+
return future
55+
56+
chained = asyncio.Future(loop=loop)
57+
58+
def done_callback(_future):
59+
try:
60+
result = _future.result()
61+
chained.set_result(result if return_value is _DEFAULT
62+
else return_value)
63+
except Exception as exc:
64+
chained.set_exception(exc)
65+
66+
future.add_done_callback(done_callback)
67+
return chained
68+
69+
5170
def is_future(f):
5271
return isinstance(f, asyncio.Future)
5372

@@ -92,6 +111,26 @@ def close_resolver(resolver):
92111
coroutine = asyncio.coroutine
93112

94113

114+
def pymongo_class_wrapper(f, pymongo_class):
115+
"""Executes the coroutine f and wraps its result in a Motor class.
116+
117+
See WrapAsync.
118+
"""
119+
@functools.wraps(f)
120+
@asyncio.coroutine
121+
def _wrapper(self, *args, **kwargs):
122+
result = yield from f(self, *args, **kwargs)
123+
124+
# Don't call isinstance(), not checking subclasses.
125+
if result.__class__ == pymongo_class:
126+
# Delegate to the current object to wrap the result.
127+
return self.wrap(result)
128+
else:
129+
return result
130+
131+
return _wrapper
132+
133+
95134
def yieldable(future):
96135
# TODO: really explain.
97136
return next(iter(future))

Diff for: motor/frameworks/tornado.py

+58-4
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,6 @@ def check_event_loop(loop):
5151
"io_loop must be instance of IOLoop, not %r" % loop)
5252

5353

54-
def return_value(value):
55-
raise gen.Return(value)
56-
57-
5854
# Beginning in Tornado 4, TracebackFuture is a deprecated alias for Future.
5955
# Future-proof Motor in case TracebackFuture is some day removed. Remove this
6056
# Future-proofing once we drop support for Tornado 3.
@@ -68,6 +64,44 @@ def get_future(loop):
6864
return _TornadoFuture()
6965

7066

67+
_DEFAULT = object()
68+
69+
70+
def future_or_callback(future, callback, io_loop, return_value=_DEFAULT):
71+
if callback:
72+
if not callable(callback):
73+
raise callback_type_error
74+
75+
# Motor's callback convention is "callback(result, error)".
76+
def done_callback(_future):
77+
try:
78+
result = _future.result()
79+
callback(result if return_value is _DEFAULT else return_value,
80+
None)
81+
except Exception as exc:
82+
callback(None, exc)
83+
84+
future.add_done_callback(done_callback)
85+
86+
elif return_value is not _DEFAULT:
87+
chained = _TornadoFuture()
88+
89+
def done_callback(_future):
90+
try:
91+
result = _future.result()
92+
chained.set_result(result if return_value is _DEFAULT
93+
else return_value)
94+
except Exception as exc:
95+
# TODO: exc_info
96+
chained.set_exception(exc)
97+
98+
future.add_done_callback(done_callback)
99+
return chained
100+
101+
else:
102+
return future
103+
104+
71105
def is_future(f):
72106
return isinstance(f, concurrent.Future)
73107

@@ -137,6 +171,26 @@ def _callback(_future):
137171
return wrapper
138172

139173

174+
def pymongo_class_wrapper(f, pymongo_class):
175+
"""Executes the coroutine f and wraps its result in a Motor class.
176+
177+
See WrapAsync.
178+
"""
179+
@functools.wraps(f)
180+
@coroutine
181+
def _wrapper(self, *args, **kwargs):
182+
result = yield f(self, *args, **kwargs)
183+
184+
# Don't call isinstance(), not checking subclasses.
185+
if result.__class__ == pymongo_class:
186+
# Delegate to the current object to wrap the result.
187+
raise gen.Return(self.wrap(result))
188+
else:
189+
raise gen.Return(result)
190+
191+
return _wrapper
192+
193+
140194
def yieldable(future):
141195
# TODO: really explain.
142196
return future

Diff for: motor/metaprogramming.py

+7-15
Original file line numberDiff line numberDiff line change
@@ -107,11 +107,15 @@ def call_method():
107107

108108

109109
def motor_coroutine(f):
110-
"""Used by Motor class to mark functions as coroutines.
110+
"""Used by Motor classes to mark functions as coroutines.
111111
112112
create_class_with_framework will decorate the function with a framework-
113113
specific coroutine decorator, like asyncio.coroutine or Tornado's
114114
gen.coroutine.
115+
116+
You cannot return a value from a motor_coroutine, the syntax differences
117+
between Tornado on Python 2 and asyncio with Python 3.5 are impossible to
118+
bridge.
115119
"""
116120
f._is_motor_coroutine = _coro_token
117121
return f
@@ -195,20 +199,8 @@ def __init__(self, prop, original_class):
195199
def create_attribute(self, cls, attr_name):
196200
async_method = self.property.create_attribute(cls, attr_name)
197201
original_class = self.original_class
198-
199-
@functools.wraps(async_method)
200-
@cls._framework.coroutine
201-
def wrapper(self, *args, **kwargs):
202-
future = async_method(self, *args, **kwargs)
203-
result = yield cls._framework.yieldable(future)
204-
205-
# Don't call isinstance(), not checking subclasses.
206-
if result.__class__ == original_class:
207-
# Delegate to the current object to wrap the result.
208-
cls._framework.return_value(self.wrap(result))
209-
else:
210-
cls._framework.return_value(result)
211-
202+
wrapper = cls._framework.pymongo_class_wrapper(async_method,
203+
original_class)
212204
if self.doc:
213205
wrapper.__doc__ = self.doc
214206

0 commit comments

Comments
 (0)