summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoraiju <aiju@phicode.de>2011-08-16 22:00:34 +0200
committeraiju <aiju@phicode.de>2011-08-16 22:00:34 +0200
commitc65100ffa0031d7a4744b3715b4c4c16da9074e9 (patch)
tree4fd4f3bc670fc2ce1ed6217a13478ac3286c4b40
parent2f2c93066909f04e346f739f6ed30f536a85fd28 (diff)
downloadplan9front-c65100ffa0031d7a4744b3715b4c4c16da9074e9.tar.xz
lib9p: added toilet queues
-rw-r--r--sys/include/9p.h24
-rw-r--r--sys/src/lib9p/mkfile1
-rw-r--r--sys/src/lib9p/queue.c113
3 files changed, 138 insertions, 0 deletions
diff --git a/sys/include/9p.h b/sys/include/9p.h
index 85e2304ba..a3415d246 100644
--- a/sys/include/9p.h
+++ b/sys/include/9p.h
@@ -27,10 +27,28 @@ typedef struct Filelist Filelist;
typedef struct Tree Tree;
typedef struct Readdir Readdir;
typedef struct Srv Srv;
+typedef struct Reqqueue Reqqueue;
+typedef struct Queueelem Queueelem;
#pragma incomplete Filelist
#pragma incomplete Readdir
+struct Queueelem
+{
+ Queueelem *prev, *next;
+ void (*f)(Req *);
+};
+
+struct Reqqueue
+{
+ QLock;
+ Rendez;
+ Queueelem;
+ int pid;
+ Req *cur;
+ jmp_buf flush;
+};
+
struct Fid
{
ulong fid;
@@ -60,6 +78,8 @@ struct Req
Fid* afid;
Fid* newfid;
Srv* srv;
+
+ Queueelem qu;
/* below is implementation-specific; don't use */
QLock lk;
@@ -255,3 +275,7 @@ int authattach(Req*);
extern void (*_forker)(void (*)(void*), void*, int);
+Reqqueue* reqqueuecreate(void);
+void reqqueuepush(Reqqueue*, Req*, void (*)(Req *));
+void reqqueueflush(Reqqueue*, Req*);
+int reqqueueflushed(void);
diff --git a/sys/src/lib9p/mkfile b/sys/src/lib9p/mkfile
index abbf8dc65..78129109b 100644
--- a/sys/src/lib9p/mkfile
+++ b/sys/src/lib9p/mkfile
@@ -12,6 +12,7 @@ OFILES=\
req.$O\
parse.$O\
post.$O\
+ queue.$O\
rfork.$O\
srv.$O\
thread.$O\
diff --git a/sys/src/lib9p/queue.c b/sys/src/lib9p/queue.c
new file mode 100644
index 000000000..307c385fc
--- /dev/null
+++ b/sys/src/lib9p/queue.c
@@ -0,0 +1,113 @@
+#include <u.h>
+#include <libc.h>
+#include <thread.h>
+#include <fcall.h>
+#include <9p.h>
+
+static int
+_reqqueuenote(void *uregs, char *note)
+{
+ Reqqueue *q;
+
+ if(strcmp(note, "flush") != 0)
+ return 0;
+ q = *threaddata();
+ if(q != nil){
+ q->cur = nil;
+ notejmp(uregs, q->flush, 1);
+ }
+ return 1;
+}
+
+static void
+_reqqueueproc(void *v)
+{
+ Reqqueue *q;
+ Req *r;
+ void (*f)(Req *);
+
+ q = v;
+ *threaddata() = q;
+ rfork(RFNOTEG);
+ threadnotify(_reqqueuenote, 1);
+ for(;;){
+ qlock(q);
+ q->cur = nil;
+ while(q->next == q)
+ rsleep(q);
+ r = (Req*)(((char*)q->next) - ((char*)&((Req*)0)->qu));
+ r->qu.next->prev = r->qu.prev;
+ r->qu.prev->next = r->qu.next;
+ f = r->qu.f;
+ qlock(&r->lk);
+ memset(&r->qu, 0, sizeof(r->qu));
+ qunlock(&r->lk);
+ q->cur = r;
+ if(setjmp(q->flush)){
+ respond(r, "interrupted");
+ continue;
+ }
+ qunlock(q);
+ f(r);
+ }
+}
+
+Reqqueue *
+reqqueuecreate(void)
+{
+ Reqqueue *q;
+
+ q = emalloc9p(sizeof(*q));
+ memset(q, 0, sizeof(*q));
+ q->l = q;
+ q->next = q->prev = q;
+ q->pid = threadpid(proccreate(_reqqueueproc, q, mainstacksize));
+ print("%d\n", q->pid);
+ return q;
+}
+
+void
+reqqueuepush(Reqqueue *q, Req *r, void (*f)(Req *))
+{
+ qlock(q);
+ r->qu.f = f;
+ r->qu.next = q;
+ r->qu.prev = q->prev;
+ q->prev->next = &r->qu;
+ q->prev = &r->qu;
+ rwakeupall(q);
+ qunlock(q);
+}
+
+void
+reqqueueflush(Reqqueue *q, Req *r)
+{
+ qlock(q);
+ if(q->cur == r){
+ postnote(PNPROC, q->pid, "flush");
+ qunlock(q);
+ }else{
+ if(r->qu.next != nil){
+ r->qu.next->prev = r->qu.prev;
+ r->qu.prev->next = r->qu.next;
+ }
+ qlock(&r->lk);
+ memset(&r->qu, 0, sizeof(r->qu));
+ qunlock(&r->lk);
+ qunlock(q);
+ respond(r, "interrupted");
+ }
+}
+
+int
+reqqueueflushed(void)
+{
+ Reqqueue *q;
+
+ q = *threaddata();
+ qlock(q);
+ if(setjmp(q->flush))
+ return 1;
+ qunlock(q);
+ return 0;
+}