summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sys/src/9/port/devmnt.c14
-rw-r--r--sys/src/9/port/devstream.c580
-rw-r--r--sys/src/9/port/portdat.h3
-rw-r--r--sys/src/cmd/cp.c13
4 files changed, 0 insertions, 610 deletions
diff --git a/sys/src/9/port/devmnt.c b/sys/src/9/port/devmnt.c
index 52f093a1a..e6bbd02dc 100644
--- a/sys/src/9/port/devmnt.c
+++ b/sys/src/9/port/devmnt.c
@@ -30,8 +30,6 @@ struct Mntrpc
uint rpclen; /* len of buffer */
Block* b; /* reply blocks */
Mntrpc* flushed; /* message this one flushes */
- void *iocomarg; /* Rpc completion callback for pipelining */
- void (*iocomfun)(void*, int);
char done; /* Rpc completed */
};
@@ -1017,9 +1015,6 @@ mountio(Mnt *m, Mntrpc *r)
lock(m);
r->z = &up->sleep;
r->m = m;
- r->iocomarg = up->iocomarg;
- r->iocomfun = up->iocomfun;
- up->iocomfun = nil;
r->list = m->queue;
m->queue = r;
unlock(m);
@@ -1044,10 +1039,6 @@ mountio(Mnt *m, Mntrpc *r)
if(devtab[m->c->type]->write(m->c, r->rpc, n, 0) != n)
error(Emountrpc);
- /* Rpc commited */
- if(r->iocomfun != nil)
- (*r->iocomfun)(r->iocomarg, 0);
-
/* Gate readers onto the mount point one at a time */
for(;;) {
lock(m);
@@ -1190,11 +1181,6 @@ mountmux(Mnt *m, Mntrpc *r)
/* look for a reply to a message */
if(q->request.tag == r->reply.tag) {
*l = q->list;
-
- /* Rpc completed */
- if(q->iocomfun != nil)
- (*q->iocomfun)(q->iocomarg, 1);
-
if(q == r) {
q->done = 1;
unlock(m);
diff --git a/sys/src/9/port/devstream.c b/sys/src/9/port/devstream.c
deleted file mode 100644
index 8c150633e..000000000
--- a/sys/src/9/port/devstream.c
+++ /dev/null
@@ -1,580 +0,0 @@
-#include "u.h"
-#include "../port/lib.h"
-#include "mem.h"
-#include "dat.h"
-#include "fns.h"
-#include "../port/error.h"
-
-typedef struct Stream Stream;
-typedef struct Iocom Iocom;
-
-struct Stream
-{
- Ref;
- Lock;
-
- int iounit;
- int noseek;
-
- Ref nrp;
- Ref nwp;
- Ref nwq;
-
- Proc *rp[4];
- Proc *wp[2];
-
- Block *rlist;
-
- vlong soff;
- vlong roff;
- vlong woff;
-
- QLock rcl;
- QLock wcl;
- QLock rql;
- QLock wql;
-
- Rendez wz;
-
- Queue *rq;
- Queue *wq;
- Chan *f;
-};
-
-struct Iocom
-{
- Proc *p;
- QLock *q;
- Stream *s;
- Block *b;
-};
-
-static void
-putstream(Stream *s)
-{
- if(decref(s))
- return;
- freeblist(s->rlist);
- qfree(s->rq);
- qfree(s->wq);
- if(s->f != nil)
- cclose(s->f);
- free(s);
-}
-
-#define BOFF(b) (*(vlong*)((b)->rp - sizeof(vlong)))
-#define BDONE (1<<15)
-#define BERROR (1<<14)
-
-static Block*
-sblock(Stream *s)
-{
- Block *b;
-
- b = allocb(sizeof(vlong)+s->iounit);
- b->flag &= ~(BDONE|BERROR);
- b->wp += sizeof(vlong);
- b->rp = b->wp;
- return b;
-}
-
-static void
-iocom(void *arg, int complete)
-{
- Iocom *io = arg;
- Stream *s;
- QLock *q;
- Proc *p;
-
- p = io->p;
- if(complete && p == up){
- up->iocomfun = nil;
- up->iocomarg = nil;
- }
-
- q = io->q;
- if(q != nil && p == up){
- io->q = nil;
- qunlock(q);
- }
-
- s = io->s;
- if(complete && s != nil && s->noseek){
- io->s = nil;
- lock(s);
- BOFF(io->b) = s->soff;
- s->soff += s->iounit;
- unlock(s);
- }
-}
-
-static void
-ioq(Iocom *io, QLock *q)
-{
- eqlock(q); /* unlocked in iocom() above */
-
- io->p = up;
- io->q = q;
- io->s = nil;
- io->b = nil;
-
- up->iocomarg = io;
- up->iocomfun = iocom;
-}
-
-static void
-streamreader(void *arg)
-{
- Stream *s = arg;
- Iocom io;
- Chan *f;
- Block *b, *l, **ll;
- vlong o;
- int id, n;
-
- id = incref(&s->nrp) % nelem(s->rp);
- s->rp[id] = up;
-
- f = s->f;
- b = sblock(s);
- qlock(&s->rql);
- if(waserror()){
- qhangup(s->rq, up->errstr);
- goto Done;
- }
- if(s->noseek == -1){
- BOFF(b) = 0;
- n = devtab[f->type]->read(f, b->wp, s->iounit, 0x7fffffffffffffLL);
-
- if(n > 0){
- b->wp += n;
- b->flag |= BDONE;
- b->next = nil;
- s->rlist = b;
- s->soff = s->iounit;
- s->roff = 0;
- s->noseek = 1;
-
- b = sblock(s);
- } else {
- s->noseek = 0;
- }
- }
- while(!qisclosed(s->rq)) {
- ll = &s->rlist;
- while((l = *ll) != nil){
- if((l->flag & BDONE) == 0 || BOFF(l) != s->roff){
- if(s->noseek){
- ll = &l->next;
- continue;
- }
- break;
- }
- if((l->flag & BERROR) != 0)
- error((char*)l->rp);
- if(BLEN(l) == 0){
- qhangup(s->rq, nil);
- poperror();
- goto Done;
- }
- s->roff += s->noseek ? s->iounit : BLEN(l);
- *ll = l->next;
- l->next = nil;
- qbwrite(s->rq, l);
- }
-
- n = s->iounit;
- o = s->roff;
- l = s->rlist;
- if(s->noseek) {
- o = 0;
- b->next = l;
- s->rlist = b;
- } else if(l == nil) {
- b->next = nil;
- s->rlist = b;
- } else {
- if(o < BOFF(l)){
- n = BOFF(l) - o;
- b->next = l;
- s->rlist = b;
- } else {
- for(;; l = l->next){
- if((l->flag & BDONE) != 0 && BLEN(l) == 0)
- goto Done;
- o = BOFF(l) + ((l->flag & BDONE) == 0 ? s->iounit : BLEN(l));
- if(l->next == nil)
- break;
- if(o < BOFF(l->next)){
- n = BOFF(l->next) - o;
- break;
- }
- }
- b->next = l->next;
- l->next = b;
- }
- }
- BOFF(b) = o;
- qunlock(&s->rql);
-
- if(waserror()){
- poperror();
- goto Exit;
- }
- ioq(&io, &s->rcl);
- io.b = b;
- io.s = s;
- if(waserror()){
- strncpy((char*)b->wp, up->errstr, s->iounit-1);
- b->wp[s->iounit-1] = 0;
- n = -1;
- } else {
- n = devtab[f->type]->read(f, b->wp, n, o);
- if(n < 0)
- error(Eio);
- poperror();
- }
- iocom(&io, 1);
- poperror();
-
- l = b;
- b = sblock(s);
- qlock(&s->rql);
- if(n >= 0)
- l->wp += n;
- else
- l->flag |= BERROR;
- l->flag |= BDONE;
- }
- poperror();
-Done:
- qunlock(&s->rql);
- freeb(b);
-Exit:
- s->rp[id] = nil;
- putstream(s);
- pexit("closed", 1);
-}
-
-static void
-streamwriter(void *arg)
-{
- Stream *s = arg;
- Iocom io;
- Block *b;
- Chan *f;
- vlong o;
- int id, n;
-
- id = incref(&s->nwp) % nelem(s->wp);
- s->wp[id] = up;
-
- f = s->f;
- while(!qisclosed(s->wq)) {
- if(incref(&s->nwq) == s->nwp.ref && qlen(s->wq) == 0)
- wakeup(&s->wz); /* queue drained */
- if(waserror()){
- decref(&s->nwq);
- break;
- }
- ioq(&io, &s->wcl);
- b = qbread(s->wq, s->iounit);
- decref(&s->nwq);
- if(b == nil){
- iocom(&io, 1);
- break;
- }
- poperror();
-
- if(waserror()){
- qhangup(s->wq, up->errstr);
- iocom(&io, 1);
- freeb(b);
- break;
- }
- n = BLEN(b);
- o = s->woff;
- s->woff += n;
- if(devtab[f->type]->write(f, b->rp, n, o) != n)
- error(Eio);
- iocom(&io, 1);
- freeb(b);
- poperror();
- }
-
- s->wp[id] = nil;
- wakeup(&s->wz);
-
- putstream(s);
- pexit("closed", 1);
-}
-
-static int
-streamgen(Chan *c, char *, Dirtab*, int, int s, Dir *dp)
-{
- static int perm[] = { 0400, 0200, 0600, 0 };
- Fgrp *fgrp = up->fgrp;
- Chan *f;
- Qid q;
-
- if(s == DEVDOTDOT){
- devdir(c, c->qid, ".", 0, eve, DMDIR|0555, dp);
- return 1;
- }
- if(s == 0)
- return 0;
- s--;
- if(s > fgrp->maxfd)
- return -1;
- if((f=fgrp->fd[s]) == nil)
- return 0;
- sprint(up->genbuf, "%dstream", s);
- mkqid(&q, s+1, 0, QTFILE);
- devdir(c, q, up->genbuf, 0, eve, perm[f->mode&3], dp);
- return 1;
-}
-
-static Chan*
-streamattach(char *spec)
-{
- return devattach(L'¶', spec);
-}
-
-static Walkqid*
-streamwalk(Chan *c, Chan *nc, char **name, int nname)
-{
- return devwalk(c, nc, name, nname, (Dirtab *)0, 0, streamgen);
-}
-
-static int
-streamstat(Chan *c, uchar *db, int n)
-{
- return devstat(c, db, n, (Dirtab *)0, 0L, streamgen);
-}
-
-static Chan*
-streamopen(Chan *c, int omode)
-{
- Stream *s;
-
- c->aux = nil;
- if(c->qid.type & QTDIR){
- if(omode != 0)
- error(Eisdir);
- c->mode = 0;
- c->flag |= COPEN;
- c->offset = 0;
- return c;
- }
- s = mallocz(sizeof(*s), 1);
- if(s == nil)
- error(Enomem);
- incref(s);
- if(waserror()){
- putstream(s);
- nexterror();
- }
- omode = openmode(omode);
- s->f = fdtochan(c->qid.path - 1, omode, 0, 1);
- if(s->f == nil || s->f->qid.type != QTFILE)
- error(Eperm);
- s->noseek = -1;
- s->roff = s->f->offset;
- s->woff = s->f->offset;
- s->iounit = s->f->iounit;
- if(s->iounit <= 0 || s->iounit > qiomaxatomic)
- s->iounit = qiomaxatomic;
- c->iounit = s->iounit;
- c->aux = s;
- c->mode = omode;
- c->flag |= COPEN;
- c->offset = 0;
- poperror();
- return c;
-}
-
-static int
-isdrained(void *a)
-{
- Stream *s;
- int i;
-
- s = a;
- if(s->wq == nil)
- return 1;
-
- if(qisclosed(s->wq) == 0)
- return qlen(s->wq) == 0 && s->nwq.ref == s->nwp.ref;
-
- for(i=0; i<nelem(s->wp); i++)
- if(s->wp[i] != nil)
- return 0;
-
- return 1;
-}
-
-static void
-streamdrain(Chan *c)
-{
- Stream *s;
-
- if((s = c->aux) == nil)
- return;
- eqlock(&s->wql);
- if(waserror()){
- qunlock(&s->wql);
- nexterror();
- }
- while(!isdrained(s))
- sleep(&s->wz, isdrained, s);
- qunlock(&s->wql);
- poperror();
-}
-
-static void
-streamclose(Chan *c)
-{
- Stream *s;
- int i;
-
- if((c->flag & COPEN) == 0 || (s = c->aux) == nil)
- return;
- if(s->rq != nil){
- qclose(s->rq);
- for(i=0; i<nelem(s->rp); i++)
- postnote(s->rp[i], 1, "streamclose", 0);
- }
- if(s->wq != nil){
- qhangup(s->wq, nil);
- if(!waserror()){
- streamdrain(c);
- poperror();
- }
- qclose(s->wq); /* discard the data */
- for(i=0; i<nelem(s->wp); i++)
- postnote(s->wp[i], 1, "streamclose", 0);
- }
- c->aux = nil;
- putstream(s);
-}
-
-static int
-canpipeline(Chan *f, int mode)
-{
- USED(mode);
-
- return devtab[f->type]->dc == 'M';
-}
-
-static Queue*
-streamqueue(Chan *c, int mode)
-{
- Stream *s;
- int i, n;
-
- s = c->aux;
- if(s == nil || c->qid.type != QTFILE)
- error(Eperm);
-
- switch(mode){
- case OREAD:
- while(s->rq == nil){
- qlock(&s->rql);
- if(s->rq != nil){
- qunlock(&s->rql);
- break;
- }
- s->rq = qopen(conf.pipeqsize, 0, 0, 0);
- if(s->rq == nil){
- qunlock(&s->rql);
- error(Enomem);
- }
- n = canpipeline(s->f, mode) ? nelem(s->rp) : 1;
- for(i=0; i<n; i++){
- incref(s);
- kproc("streamreader", streamreader, s);
- }
- while(s->nrp.ref != n)
- sched();
- qunlock(&s->rql);
- break;
- }
- return s->rq;
- case OWRITE:
- while(s->wq == nil){
- qlock(&s->wql);
- if(s->wq != nil){
- qunlock(&s->wql);
- break;
- }
- s->wq = qopen(conf.pipeqsize, 0, 0, 0);
- if(s->wq == nil){
- qunlock(&s->wql);
- error(Enomem);
- }
- n = canpipeline(s->f, mode) ? nelem(s->wp) : 1;
- for(i=0; i<n; i++){
- incref(s);
- kproc("streamwriter", streamwriter, s);
- }
- while(s->nwp.ref != n)
- sched();
- qunlock(&s->wql);
- break;
- }
- return s->wq;
- }
- error(Egreg);
- return nil;
-}
-
-static long
-streamread(Chan *c, void *va, long n, vlong)
-{
- if(c->qid.type == QTDIR)
- return devdirread(c, va, n, (Dirtab *)0, 0L, streamgen);
- return qread(streamqueue(c, OREAD), va, n);
-}
-
-static Block*
-streambread(Chan *c, long n, ulong)
-{
- return qbread(streamqueue(c, OREAD), n);
-}
-
-static long
-streamwrite(Chan *c, void *va, long n, vlong)
-{
- if(n == 0)
- streamdrain(c);
- return qwrite(streamqueue(c, OWRITE), va, n);
-}
-
-static long
-streambwrite(Chan *c, Block *b, ulong)
-{
- if(BLEN(b) == 0)
- streamdrain(c);
- return qbwrite(streamqueue(c, OWRITE), b);
-}
-
-Dev streamdevtab = {
- L'¶',
- "stream",
-
- devreset,
- devinit,
- devshutdown,
- streamattach,
- streamwalk,
- streamstat,
- streamopen,
- devcreate,
- streamclose,
- streamread,
- streambread,
- streamwrite,
- streambwrite,
- devremove,
- devwstat,
-};
diff --git a/sys/src/9/port/portdat.h b/sys/src/9/port/portdat.h
index 4c55cbcfb..ae047da66 100644
--- a/sys/src/9/port/portdat.h
+++ b/sys/src/9/port/portdat.h
@@ -777,9 +777,6 @@ struct Proc
PMMU;
char *syscalltrace; /* syscall trace */
-
- void *iocomarg; /* I/O completion callback for pipelining */
- void (*iocomfun)(void*, int);
};
enum
diff --git a/sys/src/cmd/cp.c b/sys/src/cmd/cp.c
index 88eaead4f..341d32c14 100644
--- a/sys/src/cmd/cp.c
+++ b/sys/src/cmd/cp.c
@@ -127,19 +127,6 @@ copy(char *from, char *to, int todir)
if(buflen <= 0)
buflen = DEFB;
- if(dirb->length/2 > buflen){
- char nam[32];
-
- snprint(nam, sizeof nam, "/fd/%dstream", fdf);
- fds = open(nam, OREAD);
- if(fds >= 0){
- close(fdf);
- fdf = fds;
- }
- snprint(nam, sizeof nam, "/fd/%dstream", fdt);
- fds = open(nam, OWRITE);
- }
-
if(copy1(fdf, fds < 0 ? fdt : fds, from, to)==0){
if(fds >= 0 && write(fds, "", 0) < 0){
fprint(2, "cp: error writing %s: %r\n", to);