-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest2-objects.py
More file actions
executable file
·70 lines (46 loc) · 1.27 KB
/
test2-objects.py
File metadata and controls
executable file
·70 lines (46 loc) · 1.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
#!/usr/bin/python
import os, sys, time
sys.path.append('..')
import rpythonic
################################
rpy = rpythonic.RPython('test2')
@rpy.object
class MyRpyObject(object):
def __init__(self, x=.0, y=.0, z=.0):
self.x = x; self.y = y; self.z = z
def add( self, x=.0,y=.0,z=.0 ):
print( 'add( %s, %s, %s )' %(x,y,z) )
self.x += x; self.y += y; self.z += z
def sum( self ): return self.x + self.y + self.z
def show( self ): print( '<%s %s %s>' %(self.x, self.y, self.z))
rpy.cache(refresh=1)
o = MyRpyObject(.99, 0.33, 0.45)
print( o.x )
o.add( 1.1, 2.2, 3.3 )
print( o.x )
o.show()
if '--thread-test' in sys.argv:
import thread
lock = thread.allocate_lock()
GO = True
def ctypes_thread():
print( 'thread start' )
while GO:
lock.acquire()
o.add( 0.1, 0.2, 0.3 )
lock.release()
print( 'thread exit' )
thread.start_new_thread( ctypes_thread, () )
for i in range(10000*10):
# getting values from _pointer.contents is thread safe even without locks
print o.x, o.y, o.z
lock.acquire()
o.show() # not thread safe
lock.release()
GO = False
print('----------------test exit----------------')
os.abort()
# test proves garbage collectors thread safe from python locks:
# . ref (reference counting)
# . boehm
## unsafe: hybrid