Skip to content

Commit

Permalink
Merge pull request #149 from wooey/realtime-threading
Browse files Browse the repository at this point in the history
Use threads instead of processes for real time updates
  • Loading branch information
Chris7 authored Sep 2, 2016
2 parents c71da7a + 208d4b5 commit 15a56fe
Show file tree
Hide file tree
Showing 10 changed files with 130 additions and 94 deletions.
12 changes: 1 addition & 11 deletions appveyor.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
environment:
PYTHONIOENCODING: "utf-8"
matrix:
- PYTHON: "C:/Python27"
DJANGO: "Django==1.6.*"
- PYTHON: "C:/Python27"
DJANGO: "Django==1.7.*"
- PYTHON: "C:/Python27"
Expand All @@ -12,8 +10,6 @@ environment:
- PYTHON: "C:/Python27"
DJANGO: "Django"

- PYTHON: "C:/Python34"
DJANGO: "Django==1.6.*"
- PYTHON: "C:/Python34"
DJANGO: "Django==1.7.*"
- PYTHON: "C:/Python34"
Expand Down Expand Up @@ -52,10 +48,4 @@ build: off
test_script:
- "%PYTHON%/Scripts/pip.exe --version"
- "%PYTHON%/Scripts/nosetests.exe --with-coverage --cover-erase --cover-package=wooey tests"
- ps:
if($env:DJANGO -eq "Django==1.6.*"){
"%PYTHON%/Scripts/django-admin test --settings=wooey.test_settings wooey.tests"
}
else {
"%PYTHON%/Scripts/django-admin.exe test --settings=wooey.test_settings wooey.tests"
}
- "%PYTHON%/Scripts/django-admin test --settings=wooey.test_settings wooey.tests"
12 changes: 6 additions & 6 deletions wooey/backend/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ def valid_user(obj, user):
ret['error'] = _('You are not permitted to use this script')
if not groups and obj.is_active:
ret['valid'] = True
if obj.is_active is True:
if obj.is_active == True:
if set(list(user.groups.all())) & set(list(groups)):
ret['valid'] = True
ret['display'] = 'disabled' if wooey_settings.WOOEY_SHOW_LOCKED_SCRIPTS else 'hide'
Expand Down Expand Up @@ -387,12 +387,12 @@ def get_file_info(filepath):
# returns info about the file
filetype, preview = False, None
tests = [('tabular', test_delimited), ('fasta', test_fastx), ('image', test_image)]
while filetype is False and tests:
while filetype == False and tests:
ptype, pmethod = tests.pop()
filetype, preview = pmethod(filepath)
filetype = ptype if filetype else filetype
preview = None if filetype is False else preview
filetype = None if filetype is False else filetype
preview = None if filetype == False else preview
filetype = None if filetype == False else filetype
try:
json_preview = json.dumps(preview)
except:
Expand Down Expand Up @@ -451,11 +451,11 @@ def test_fastx(filepath):
break
if not row.strip():
continue
if found_caret is False and row[0] != '>':
if found_caret == False and row[0] != '>':
if row[0] == ';':
continue
break
elif found_caret is False and row[0] == '>':
elif found_caret == False and row[0] == '>':
found_caret = True
if row and row[0] == '>':
if seq:
Expand Down
4 changes: 2 additions & 2 deletions wooey/models/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ def get_realtime_key(self):

def update_realtime(self, stdout='', stderr='', delete=False):
wooey_cache = wooey_settings.WOOEY_REALTIME_CACHE
if delete is False and wooey_cache is None:
if delete == False and wooey_cache is None:
self.stdout = stdout
self.stderr = stderr
self.save()
Expand Down Expand Up @@ -483,7 +483,7 @@ def value(self, value):
elif field == self.INTEGER:
value = self.WOOEY_FIELD_MAP[field](value) if isinstance(value, int) or str(value).isdigit() else None
elif field == self.BOOLEAN:
if value is None or value is False:
if value is None or value == False:
value = None
if value:
value = True
Expand Down
2 changes: 1 addition & 1 deletion wooey/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def script_version_postsave(instance, created, **kwargs):
instance._script_upgrade = False
instance._script_cl_creation = False
instance._rename_script = False
if res['valid'] is False:
if res['valid'] == False:
# delete the model on exceptions.
# TODO: use django messages backend to propogate this message to the admin
instance.delete()
Expand Down
69 changes: 44 additions & 25 deletions wooey/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import sys
import traceback

from threading import Thread

from django.utils.text import get_valid_filename
from django.core.files import File
from django.conf import settings
Expand All @@ -18,11 +20,10 @@

from . import settings as wooey_settings

from billiard import Process, Queue
try:
from Queue import Empty
from Queue import Empty, Queue
except ImportError:
from queue import Empty # python 3.x
from queue import Empty, Queue # python 3.x

ON_POSIX = 'posix' in sys.builtin_module_names

Expand All @@ -31,28 +32,32 @@

def enqueue_output(out, q):
for line in iter(out.readline, b''):
q.put(line)
out.close()
q.put(line.decode('utf-8'))
try:
out.close()
except IOError:
pass


def output_monitor_queue(out):
q = Queue()
p = Process(target=enqueue_output, args=(out, q))
def output_monitor_queue(queue, out):
p = Thread(target=enqueue_output, args=(out, queue))
p.start()
return q, p
return p


def update_from_output_queue(q, out):
try:
line = q.get_nowait()
except Empty:
return out
def update_from_output_queue(queue, out):
lines = []
while True:
try:
line = queue.get_nowait()
lines.append(line)
except Empty:
break

out += str(line)
out += ''.join(map(str, lines))
return out



@worker_process_init.connect
def configure_workers(*args, **kwargs):
# this sets up Django on nodes started by the worker daemon.
Expand Down Expand Up @@ -101,19 +106,18 @@ def submit_script(**kwargs):
job.save()

stdout, stderr = '', ''
proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=abscwd)
proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=abscwd, bufsize=0)

# We need to use subprocesses to capture the IO, otherwise they will block one another
# i.e. a check against stderr will sit waiting on stderr before returning
# we use Queues to communicate
qout, pout = output_monitor_queue(proc.stdout)
qerr, perr = output_monitor_queue(proc.stderr)
qout, qerr = Queue(), Queue()
pout = output_monitor_queue(qout, proc.stdout)
perr = output_monitor_queue(qerr, proc.stderr)

prev_std = None

# Loop until the process is complete + both stdout/stderr have EOFd
while proc.poll() is None or pout.is_alive() or perr.is_alive():

def check_output(job, stdout, stderr, prev_std):
# Check for updates from either (non-blocking)
stdout = update_from_output_queue(qout, stdout)
stderr = update_from_output_queue(qerr, stderr)
Expand All @@ -123,6 +127,19 @@ def submit_script(**kwargs):
job.update_realtime(stdout=stdout, stderr=stderr)
prev_std = (stdout, stderr)

return stdout, stderr, prev_std

# Loop until the process is complete + both stdout/stderr have EOFd
while proc.poll() is None or pout.is_alive() or perr.is_alive():
stdout, stderr, prev_std = check_output(job, stdout, stderr, prev_std)

# Catch any remaining output
try:
proc.stdout.flush()
except ValueError: # Handle if stdout is closed
pass
stdout, stderr, prev_std = check_output(job, stdout, stderr, prev_std)

# tar/zip up the generated content for bulk downloads
def get_valid_file(cwd, name, ext):
out = os.path.join(cwd, name)
Expand Down Expand Up @@ -185,6 +202,7 @@ def get_valid_file(cwd, name, ext):

return (stdout, stderr)


@celery_app.task(base=WooeyTask)
def cleanup_wooey_jobs(**kwargs):
from django.utils import timezone
Expand All @@ -199,11 +217,12 @@ def cleanup_wooey_jobs(**kwargs):
if user_settings:
WooeyJob.objects.filter(user__isnull=False, created_date__lte=now-user_settings).delete()


celery_app.conf.update(
CELERYBEAT_SCHEDULE = {
CELERYBEAT_SCHEDULE={
'cleanup-old-jobs': {
'task': 'wooey.tasks.cleanup_wooey_jobs',
'schedule': crontab(hour=0, minute=0), # cleanup at midnight each day
'schedule': crontab(hour=0, minute=0), # cleanup at midnight each day
},
}
)
)
68 changes: 34 additions & 34 deletions wooey/templates/wooey/jobs/job_view.html
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,13 @@ <h1 class="text-center">{{ job_error }}</h1>
<!-- Output command and stderr, stdout in console-style view -->
<div class="col-sm-12 col-md-12">
<div class="thumbnail panel panel-{% if job_info.job.stderr %}danger{% else %}default{% endif %}">
<div class="panel-heading"><span class="glyphicon glyphicon-console"></span> {% trans "Console" %} <a class="icon icon-collapse" data-toggle="collapse" data-target="#collapse-console" href="#collapse-console"></a></div>

<div id="collapse-console" class="panel-collapse collapse in">
<div class="panel-body console">
<pre class="console-body console-command">{{ job_info.job.command }}</pre>
<div class="panel-heading">
<span class="glyphicon glyphicon-console"></span> {% trans "Console" %} <a class="icon icon-collapse" data-toggle="collapse" data-target="#collapse-console" href="#collapse-console"></a>
</div>
<div id="collapse-console" class="panel-collapse collapse in">
<div class="panel-body console">
<pre id="job-command" class="console-body console-command">{{ job_info.job.command }}</pre>
</div>
</div>
</div>
</div>
Expand All @@ -154,30 +155,21 @@ <h1 class="text-center">{{ job_error }}</h1>

<div id="collapse-files" class="panel-collapse collapse in">
<div class="panel-body panel-nopad">
<table class="table table-striped">
<tr>
<th>{% trans "Filename" %}</th>
<th>{% trans "Parameter" %}</th>
<th></th>
<th></th>
<th style="text-align:center;">{% trans "Size" %}</th>
</tr>
{% for file in job_info.all_files %}
<table id="job-files" class="table table-striped">
<thead>
<tr>
<th>
{% if file.filetype == 'images' %}<span class="glyphicon glyphicon-picture"></span>
{% elif file.filetype == 'fasta' %}<span class="glyphicon glyphicon-list-alt"></span>
{% elif file.filetype == 'tabular' %}<span class="glyphicon glyphicon-list-alt"></span>
{% else %}<span class="glyphicon glyphicon-file"></span>{% endif %}

<a href="{{ file.url }}">{{ file.basename }}</a>
</th>
<td>{% if file.slug %}{{ file.slug }}{% endif %}</td>
<td></td>
<td></td>
<td>{% if file.size_bytes %}{{ file.size_bytes|filesizeformat|numericalign }}{% endif %}</td>
<th>{% trans "Filename" %}</th>
<th>{% trans "Parameter" %}</th>
<th></th>
<th></th>
<th style="text-align:center;">{% trans "Size" %}</th>
</tr>
</thead>
<tbody>
{% for file in job_info.all_files %}
{% include 'wooey/jobs/results/table_row.html' with file=file only %}
{% endfor %}
</tbody>
</table>
</div>
</div>
Expand Down Expand Up @@ -228,29 +220,37 @@ <h1 class="text-center">{{ job_error }}</h1>

function createConsoles() {
var $console = $('.panel-body.console');
{% if job_info.job_status|lower == 'completed' %}
{% if job_info.job.status|lower == 'completed' %}
{% if job_info.job.stdout %}
$console.append('<pre id="console-stdout" class="console-body console-stdout">{{ job_info.job.stdout|linebreaks }}</pre>');
$console.append('<pre id="console-stdout" class="console-body console-stdout">{{ job_info.job.stdout|linebreaks|escapejs }}</pre>');
{% endif %}
{% if job_info.job.stderr %}
$console.append('<pre id="console-stderr" class="console-body console-stderr">{{ job_info.job.stderr|linebreaks }}</pre>');
$console.append('<pre id="console-stderr" class="console-body console-stderr">{{ job_info.job.stderr|linebreaks|escapejs }}</pre>');
{% endif %}
{% else %}
$console.append('<pre id="console-stdout" class="console-body console-stdout">{{ job_info.job.stdout|linebreaks }}</pre>');
$console.append('<pre id="console-stderr" class="console-body console-stderr">{{ job_info.job.stderr|linebreaks }}</pre>');
$console.append('<pre id="console-stdout" class="console-body console-stdout">{{ job_info.job.stdout|linebreaks|escapejs }}</pre>');
$console.append('<pre id="console-stderr" class="console-body console-stderr">{{ job_info.job.stderr|linebreaks|escapejs }}</pre>');
{% endif %}
}

function jobRefresh() {
/* Update the job data, items, outputs etc. */
$.get("{% url "wooey:celery_results_json_html" job_info.job.id %}", function(data){
$('#page-content-wrapper').removeClass().addClass('status-' + data['status']);
if($('#console-stdout').length == 0)
var $commandText = $('#job-command');
var $stdoutConsole = $('#console-stdout');
if($stdoutConsole.length == 0) {
createConsoles();
$('#console-stdout').text(data['stdout']);
$stdoutConsole = $('#console-stdout');
}
if ($commandText.text().length == 0){
$commandText.html(data['command']);
}
$stdoutConsole.text(data['stdout']);
$('#console-stderr').text(data['stderr']);

$('#job-outputs').html(data['outputs_html']);
$('#job-outputs').html(data['preview_outputs_html']);
$('#job-files > tbody').html(data['file_outputs_html']);
$("a[data-favorite]").click(toggleFavorite);

if(data['status'] == 'completed'){
Expand Down
15 changes: 15 additions & 0 deletions wooey/templates/wooey/jobs/results/table_row.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{% load i18n %}
{% load wooey_tags %}
<tr>
<th>
{% if file.filetype == 'images' %}<span class="glyphicon glyphicon-picture"></span>
{% elif file.filetype == 'fasta' %}<span class="glyphicon glyphicon-list-alt"></span>
{% elif file.filetype == 'tabular' %}<span class="glyphicon glyphicon-list-alt"></span>
{% else %}<span class="glyphicon glyphicon-file"></span>{% endif %}
<a href="{{ file.url }}">{{ file.basename }}</a>
</th>
<td>{% if file.slug %}{{ file.slug }}{% endif %}</td>
<td></td>
<td></td>
<td>{% if file.size_bytes %}{{ file.size_bytes|filesizeformat|numericalign }}{% endif %}</td>
</tr>
4 changes: 2 additions & 2 deletions wooey/views/authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class WooeyRegister(CreateView):
fields = ('username', 'email', 'password')

def dispatch(self, request, *args, **kwargs):
if wooey_settings.WOOEY_AUTH is False:
if wooey_settings.WOOEY_AUTH == False:
return HttpResponseRedirect(wooey_settings.WOOEY_REGISTER_URL)
return super(WooeyRegister, self).dispatch(request, *args, **kwargs)

Expand Down Expand Up @@ -49,7 +49,7 @@ def get_success_url(self):


def wooey_login(request):
if wooey_settings.WOOEY_AUTH is False:
if wooey_settings.WOOEY_AUTH == False:
return HttpResponseRedirect(wooey_settings.WOOEY_LOGIN_URL)
User = get_user_model()
form = modelform_factory(User, fields=('username', 'password'))
Expand Down
4 changes: 2 additions & 2 deletions wooey/views/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,9 @@ def post(self, request, *args, **kwargs):
version_pk = form.cleaned_data.get('wooey_type')
script_version = ScriptVersion.objects.get(pk=version_pk)
valid = utils.valid_user(script_version.script, request.user).get('valid')
if valid is True:
if valid == True:
group_valid = utils.valid_user(script_version.script.script_group, request.user).get('valid')
if valid is True and group_valid is True:
if valid == True and group_valid == True:
job = utils.create_wooey_job(script_version_pk=version_pk, user=user, data=form.cleaned_data)
job.submit_to_celery()
return {'valid': True, 'job_id': job.id}
Expand Down
Loading

0 comments on commit 15a56fe

Please sign in to comment.