-
Notifications
You must be signed in to change notification settings - Fork 17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Creating Process-wise ThreadLocalODMSession and MappedClass #2
Comments
I don't think I've tried this exactly, but a few ideas: when you declare the |
from ming import create_datastore
from ming.odm import ThreadLocalODMSession
from ming import schema
from ming.odm import MappedClass
from ming.odm import FieldProperty, ForeignIdProperty
session = ThreadLocalODMSession(
bind=create_datastore('mongodb://localhost:27017/test_1')
)
class WikiPage(MappedClass):
class __mongometa__:
session = session
name = 'wiki_page'
_id = FieldProperty(schema.ObjectId)
title = FieldProperty(schema.String(required=True))
text = FieldProperty(schema.String(if_missing=''))
# Insert into db 'test_1'
wp = WikiPage(title='This is first title', text='This is the first text')
session.flush()
# Insert into db 'test_2'
session.bind = create_datastore('mongodb://localhost:27017/test_2')
wp = WikiPage(title='This is second title', text='This is the second text')
session.flush() I expected the above code snippet to insert into test_2 db after I changed the bind to new datastore connection. But this didn't happen. Am i missing anything here? |
@KshitizGIT session it's just a thread local wrapper to the real session. So I think that what you are looking for is |
Btw my suggestion would be to use the |
@KshitizGIT I ran into this issue too. def fset(self, v): self.impl.bind = v
def fget(self): return self.impl.bind
type(s._get()).bind = property(fget=fget, fset=fset)
s._get().bind = Session(create_datastore('mongodb://127.0.0.1:27017/s')) I didn't see any difference (I tested only through query tough). Then I figured out the solution: from ming.odm import Mapper
session2 = ThreadLocalODMSession(bind=create_datastore('mongodb://127.0.0.1:27017/test'))
for _mapper in Mapper.all_mappers():
_mapper.session = session2
_mapper.mapped_class.query.session = session2
_mapper._compiled = False
_mapper.compile()
_mapper.session.ensure_indexes(_mapper.collection) I double checked this is working, the I also had some trouble trying what's described here https://ming.readthedocs.io/en/latest/baselevel.html#other-sessions (another issue should be created for that) Anyway there's something else weird: Here's a minimal example that shows the issue Python 3.10.2 (main, Jan 15 2022, 19:56:27) [GCC 11.1.0]
Type 'copyright', 'credits' or 'license' for more information
IPython 8.1.1 -- An enhanced Interactive Python. Type '?' for help.
In [1]: from ming import schema, create_datastore
...: from ming.odm import MappedClass, Mapper, ThreadLocalODMSession
...: from ming.odm import FieldProperty, ForeignIdProperty
In [2]: from os import getpid
In [3]: import multiprocessing
In [4]: from bson import ObjectId
In [5]: s = ThreadLocalODMSession(bind=create_datastore('mongodb://db:27017/mingtest'))
In [6]: def adapt_to_session(s):
...: for _mapper in Mapper.all_mappers():
...: _mapper.session = s
...: _mapper.mapped_class.query.session = s
...: _mapper._compiled = False
...: _mapper.compile()
...: _mapper.session.ensure_indexes(_mapper.collection)
...:
In [7]: class TestCol(MappedClass):
...: class __mongometa__:
...: session = s
...: name = 'testcol'
...:
...: _id = FieldProperty(schema.ObjectId)
...: a = FieldProperty(schema.Int)
...: b = FieldProperty(schema.Int)
...:
In [8]: def target(_id, _type):
...: s = ThreadLocalODMSession(bind=create_datastore('mongodb://db:27017/mingtest'))
...: adapt_to_session(s)
...: t = TestCol.query.get(_id)
...: if _type == 'A':
...: t.a = getpid()
...: else:
...: t.b = getpid()
...: s.flush_all()
...: s.close()
...:
In [9]: for i in range(100):
...: _id = TestCol(a=0, b=0)._id
...: s.flush_all()
...: proc1 = multiprocessing.Process(target=target, args=(_id, 'A'))
...: proc2 = multiprocessing.Process(target=target, args=(_id, 'B'))
...: proc1.start()
...: proc2.start()
...:
In [10]: for i in range(100):
...: _id = TestCol(a=0, b=0)._id
...: s.flush_all()
...: proc1 = multiprocessing.Process(target=target, args=(_id, 'A'))
...: proc2 = multiprocessing.Process(target=target, args=(_id, 'B'))
...: proc1.start()
...: proc2.start()
...:
In [11]: And it seems to run fine. > db.testcol.find({$or: [{a: 0}, {b: 0}]})
{ "_id" : ObjectId("6232017bb7957cba03f5f295"), "a" : 0, "b" : 404322 }
{ "_id" : ObjectId("6232017cb7957cba03f5f2a2"), "a" : 0, "b" : 404484 }
{ "_id" : ObjectId("6232017cb7957cba03f5f2a5"), "a" : 0, "b" : 404544 }
{ "_id" : ObjectId("6232017cb7957cba03f5f2b4"), "a" : 404745, "b" : 0 }
{ "_id" : ObjectId("6232017db7957cba03f5f2bf"), "a" : 0, "b" : 404896 }
{ "_id" : ObjectId("6232017db7957cba03f5f2d6"), "a" : 405192, "b" : 0 }
{ "_id" : ObjectId("6232017db7957cba03f5f2d8"), "a" : 405217, "b" : 0 }
{ "_id" : ObjectId("6232017db7957cba03f5f2df"), "a" : 0, "b" : 405317 }
{ "_id" : ObjectId("6232017eb7957cba03f5f2e7"), "a" : 405414, "b" : 0 }
{ "_id" : ObjectId("6232017eb7957cba03f5f2e8"), "a" : 405427, "b" : 0 }
{ "_id" : ObjectId("6232017eb7957cba03f5f2ec"), "a" : 405477, "b" : 0 }
{ "_id" : ObjectId("6232017eb7957cba03f5f2f5"), "a" : 0, "b" : 405619 }
{ "_id" : ObjectId("62320199b7957cba03f5f2f8"), "a" : 405694, "b" : 0 }
{ "_id" : ObjectId("62320199b7957cba03f5f2f9"), "a" : 0, "b" : 405709 }
{ "_id" : ObjectId("62320199b7957cba03f5f2fc"), "a" : 405736, "b" : 0 }
{ "_id" : ObjectId("62320199b7957cba03f5f2fe"), "a" : 405772, "b" : 0 }
{ "_id" : ObjectId("6232019ab7957cba03f5f307"), "a" : 0, "b" : 405894 }
{ "_id" : ObjectId("6232019ab7957cba03f5f309"), "a" : 0, "b" : 405908 }
{ "_id" : ObjectId("6232019ab7957cba03f5f30d"), "a" : 0, "b" : 405964 }
{ "_id" : ObjectId("6232019ab7957cba03f5f311"), "a" : 406006, "b" : 0 }
Type "it" for more I can't figure out why this is happening, I feel stuck. I am using versions: |
In [13]: def target(_id, _type):
...: db = MongoClient()
...: db.testdb.testcol.update_one({'_id': _id}, update={"$set": {_type: getpid()}})
...:
In [14]: for i in range(100):
...: _id = db.testdb.testcol.insert_one({'a': 0, 'b': 0}).inserted_id
...: proc1 = multiprocessing.Process(target=target, args=(_id, 'a'))
...: proc2 = multiprocessing.Process(target=target, args=(_id, 'b'))
...: proc1.start()
...: proc2.start() with pymongo only it works fine, every field have both making assumptions based on the sourcecode:
How can we overcome this issue then?
def doc_to_set(doc):
def to_hashable(v):
if isinstance(v, list):
return tuple((to_hashable(sv) for sv in v))
elif isinstance(v, dict):
return tuple(((to_hashable(k), to_hashable(sv))
for k, sv in sorted(v.items())))
elif hasattr(v, '__hash__'):
return v
else:
return v
return set((k, to_hashable(v)) for k, v in doc.copy().items())
fields = tuple(set((k for k, v in
doc_to_set(state.original_document)
^ doc_to_set(state.document)))) ming test suite is now saying OK, but it's not... when two processes change the same field, the latter wins. so, before this change atomic operations and uow were always in conflict. I'm preparing a PR out of this |
class WikiPage(MappedClass):
class mongometa:
session = session
name = 'wiki_page'
I am developing a multiprocess app . For each process, I would like to create a new ThreadLocalODMSession.
Given the above declarative method, how would I bind the MappedClass to different session?
I have already tried numerous ways but none seem to work.
The text was updated successfully, but these errors were encountered: