Fixing concurrency problems in history counter

- riak datatype for both riak modes
- old behaviour for sql with transaction

Change-Id: Ie89629e1cf8f02c8af532f4d5b53dced7059c3fc
This commit is contained in:
Jedrzej Nowak 2016-01-18 19:41:33 +01:00
parent aea48b733e
commit 72362bb803
4 changed files with 49 additions and 6 deletions

View File

@ -5,3 +5,4 @@ solar_db: sqlite:////tmp/solar.db
# solar_db: riak://10.0.0.2:8087
riak_ensemble: False
lock_bucket_type: ''
counters_bucket_type: ''

View File

@ -39,6 +39,11 @@
line: "log_file: /var/log/solar/solar.log"
state: present
create: yes
- lineinfile:
dest: /.solar_config_override
line: "counter_bucket_type: counters"
state: present
create: yes
- lineinfile:
dest: /home/vagrant/.bashrc
line: export PYTHONWARNINGS="ignore"
@ -59,4 +64,11 @@
- name: start riak container
shell: docker-compose up -d riak chdir=/vagrant
# preconfigure docker container
# add counters datatype etc.
- shell: timeout 60 docker exec vagrant_riak_1 riak-admin wait_for_service riak_kv
- shell: timeout 10 docker exec vagrant_riak_1 riak-admin bucket-type create counters '{"props":{"datatype":"counter"}}'
ignore_errors: yes
- shell: timeout 10 docker exec vagrant_riak_1 riak-admin bucket-type activate counters
- include: tasks/celery_init.yaml

View File

@ -28,6 +28,7 @@ C.celery_broker = 'sqla+sqlite:////tmp/celery.db'
C.celery_backend = 'db+sqlite:////tmp/celery.db'
C.riak_ensemble = False
C.lock_bucket_type = None
C.counter_bucket_type = None
C.log_file = 'solar.log'

View File

@ -36,6 +36,7 @@ from solar.dblayer.model import NONE
from solar.dblayer.model import SingleIndexCache
from solar.dblayer.model import StrInt
from solar.utils import detect_input_schema_by_value
from solar.utils import parse_database_conn
from solar.utils import solar_map
@ -1060,7 +1061,8 @@ system log
5. keep order of history
"""
_connection, _connection_details = parse_database_conn(C.solar_db)
if _connection.mode == 'sqlite':
class NegativeCounter(Model):
count = Field(int, default=int)
@ -1069,6 +1071,33 @@ class NegativeCounter(Model):
self.count -= 1
self.save()
return self.count
else:
class NegativeCounter(Model):
bucket_type = C.counter_bucket_type
def next(self):
ro = self._riak_object
ro.decrement(1)
ro.store()
val = ro.value
return val
@property
def count(self):
return self._riak_object.value
@classmethod
def get_or_create(cls, key):
return cls.get(key)
@classmethod
def get(cls, key):
try:
return cls._c.obj_cache.get(key)
except KeyError:
riak_object = cls.bucket.get(key)
return cls.from_riakobj(riak_object)
class LogItem(Model):