diff options
| -rw-r--r-- | sys/src/9/port/cache.c | 163 | ||||
| -rw-r--r-- | sys/src/9/port/devmnt.c | 376 | ||||
| -rw-r--r-- | sys/src/9/port/portdat.h | 26 | ||||
| -rw-r--r-- | sys/src/9/port/portfns.h | 2 | ||||
| -rw-r--r-- | sys/src/9/port/qio.c | 30 |
5 files changed, 482 insertions, 115 deletions
diff --git a/sys/src/9/port/cache.c b/sys/src/9/port/cache.c index e1f449df4..306228731 100644 --- a/sys/src/9/port/cache.c +++ b/sys/src/9/port/cache.c @@ -15,20 +15,30 @@ enum NBITMAP = (PGROUND(MAXCACHE)/BY2PG + MAPBITS-1) / MAPBITS, }; +/* devmnt.c: parallel read ahread implementation */ +extern void mntrahinit(Mntrah *rah); +extern long mntrahread(Mntrah *rah, Chan *c, uchar *buf, long len, vlong off); + typedef struct Mntcache Mntcache; struct Mntcache { - Qid qid; - int dev; - int type; + Qid qid; + int dev; + int type; QLock; + Proc *locked; + ulong nlocked; + Mntcache *hash; Mntcache *prev; Mntcache *next; /* page bitmap of valid pages */ ulong bitmap[NBITMAP]; + + /* read ahead state */ + Mntrah rah; }; typedef struct Cache Cache; @@ -124,13 +134,66 @@ clookup(Chan *c, int skipvers) return nil; } +/* + * resursive Mntcache locking. Mntcache.rah is protected by the + * same lock and we want to call cupdate() from mntrahread() + * while holding the lock. + */ +static int +cancachelock(Mntcache *m) +{ + if(m->locked == up || canqlock(m)){ + m->locked = up; + m->nlocked++; + return 1; + } + return 0; +} +static void +cachelock(Mntcache *m) +{ + if(m->locked != up){ + qlock(m); + assert(m->nlocked == 0); + m->locked = up; + } + m->nlocked++; + +} +static void +cacheunlock(Mntcache *m) +{ + assert(m->locked == up); + if(--m->nlocked == 0){ + m->locked = nil; + qunlock(m); + } +} + +/* return locked Mntcache if still valid else reset mcp */ +static Mntcache* +ccache(Chan *c) +{ + Mntcache *m; + + m = c->mcp; + if(m != nil) { + cachelock(m); + if(eqchantdqid(c, m->type, m->dev, m->qid, 0) && c->qid.type == m->qid.type) + return m; + c->mcp = nil; + cacheunlock(m); + } + return nil; +} + void copen(Chan *c) { Mntcache *m, *f, **l; - /* directories aren't cacheable and append-only files confuse us */ - if(c->qid.type&(QTDIR|QTAPPEND)){ + /* directories aren't cacheable */ + if(c->qid.type&QTDIR){ c->mcp = nil; return; } @@ -156,9 +219,9 @@ copen(Chan *c) l = &f->hash; } - if(!canqlock(m)){ + if(!cancachelock(m)){ unlock(&cache); - qlock(m); + cachelock(m); lock(&cache); f = clookup(c, 0); if(f != nil) { @@ -169,7 +232,7 @@ copen(Chan *c) */ ctail(f); unlock(&cache); - qunlock(m); + cacheunlock(m); c->mcp = f; return; } @@ -182,27 +245,16 @@ copen(Chan *c) l = &cache.hash[c->qid.path%NHASH]; m->hash = *l; *l = m; + unlock(&cache); + + m->rah.vers = m->qid.vers; + mntrahinit(&m->rah); cnodata(m); - qunlock(m); - c->mcp = m; -} -/* return locked Mntcache if still valid else reset mcp */ -static Mntcache* -ccache(Chan *c) -{ - Mntcache *m; + cacheunlock(m); - m = c->mcp; - if(m != nil) { - qlock(m); - if(eqchantdqid(c, m->type, m->dev, m->qid, 0) && c->qid.type == m->qid.type) - return m; - c->mcp = nil; - qunlock(m); - } - return nil; + c->mcp = m; } enum { @@ -243,17 +295,24 @@ cread(Chan *c, uchar *buf, int len, vlong off) KMap *k; Page *p; Mntcache *m; - int l, total; + int l, tot; ulong offset, pn, po, pe; - if(off >= MAXCACHE || len <= 0) + if(len <= 0) return 0; m = ccache(c); if(m == nil) return 0; - total = 0; + if(waserror()){ + cacheunlock(m); + nexterror(); + } + + tot = 0; + if(off >= MAXCACHE) + goto Prefetch; offset = off; if(offset+len > MAXCACHE) @@ -277,16 +336,16 @@ cread(Chan *c, uchar *buf, int len, vlong off) if(waserror()) { kunmap(k); putpage(p); - qunlock(m); nexterror(); } memmove(buf, (uchar*)VA(k) + offset, l); - poperror(); kunmap(k); - putpage(p); + poperror(); - total += l; + tot += l; + buf += l; + len -= l; offset += l; offset &= (BY2PG-1); @@ -294,12 +353,21 @@ cread(Chan *c, uchar *buf, int len, vlong off) break; pn++; - buf += l; - len -= l; } - qunlock(m); - return total; +Prefetch: + if(len > 0){ + if(m->rah.vers != m->qid.vers){ + mntrahinit(&m->rah); + m->rah.vers = m->qid.vers; + } + off += tot; + tot += mntrahread(&m->rah, c, buf, len, off); + } + cacheunlock(m); + poperror(); + + return tot; } /* invalidate pages in page bitmap */ @@ -322,7 +390,7 @@ cachedata(Mntcache *m, uchar *buf, int len, vlong off) ulong offset, pn, po, pe; if(off >= MAXCACHE || len <= 0){ - qunlock(m); + cacheunlock(m); return; } @@ -370,7 +438,7 @@ cachedata(Mntcache *m, uchar *buf, int len, vlong off) kunmap(k); putpage(p); invalidate(m, offset + pn*BY2PG, len); - qunlock(m); + cacheunlock(m); nexterror(); } memmove((uchar*)VA(k) + offset, buf, l); @@ -383,7 +451,7 @@ cachedata(Mntcache *m, uchar *buf, int len, vlong off) buf += l; len -= l; } - qunlock(m); + cacheunlock(m); } void @@ -407,5 +475,22 @@ cwrite(Chan* c, uchar *buf, int len, vlong off) return; m->qid.vers++; c->qid.vers++; + if(c->qid.type&QTAPPEND){ + cacheunlock(m); + return; + } cachedata(m, buf, len, off); } + +void +cclunk(Chan *c) +{ + Mntcache *m; + + m = ccache(c); + if(m == nil) + return; + mntrahinit(&m->rah); + cacheunlock(m); + c->mcp = nil; +} diff --git a/sys/src/9/port/devmnt.c b/sys/src/9/port/devmnt.c index 91d6ad2f9..18bea77bd 100644 --- a/sys/src/9/port/devmnt.c +++ b/sys/src/9/port/devmnt.c @@ -42,7 +42,7 @@ enum NMASK = (64*1024)>>TAGSHIFT, }; -struct Mntalloc +static struct Mntalloc { Lock; Mnt* list; /* Mount devices in use */ @@ -54,21 +54,21 @@ struct Mntalloc ulong tagmask[NMASK]; }mntalloc; -Mnt* mntchk(Chan*); -void mntdirfix(uchar*, Chan*); -Mntrpc* mntflushalloc(Mntrpc*, ulong); -Mntrpc* mntflushfree(Mnt*, Mntrpc*); -void mntfree(Mntrpc*); -void mntgate(Mnt*); -void mntqrm(Mnt*, Mntrpc*); -Mntrpc* mntralloc(Chan*, ulong); -long mntrdwr(int, Chan*, void*, long, vlong); -int mntrpcread(Mnt*, Mntrpc*); -void mountio(Mnt*, Mntrpc*); -void mountmux(Mnt*, Mntrpc*); -void mountrpc(Mnt*, Mntrpc*); -int rpcattn(void*); -Chan* mntchan(void); +static Chan* mntchan(void); +static Mnt* mntchk(Chan*); +static void mntdirfix(uchar*, Chan*); +static Mntrpc* mntflushalloc(Mntrpc*, ulong); +static Mntrpc* mntflushfree(Mnt*, Mntrpc*); +static void mntfree(Mntrpc*); +static void mntgate(Mnt*); +static void mntqrm(Mnt*, Mntrpc*); +static Mntrpc* mntralloc(Chan*, ulong); +static long mntrdwr(int, Chan*, void*, long, vlong); +static int mntrpcread(Mnt*, Mntrpc*); +static void mountio(Mnt*, Mntrpc*); +static void mountmux(Mnt*, Mntrpc*); +static void mountrpc(Mnt*, Mntrpc*); +static int rpcattn(void*); char Esbadstat[] = "invalid directory entry received from server"; char Enoversion[] = "version not established for mount channel"; @@ -257,7 +257,6 @@ mntauth(Chan *c, char *spec) Mntrpc *r; m = c->mux; - if(m == nil){ mntversion(c, VERSION9P, MAXRPC, 0); m = c->mux; @@ -275,7 +274,6 @@ mntauth(Chan *c, char *spec) } r = mntralloc(0, m->msize); - if(waserror()) { mntfree(r); nexterror(); @@ -319,7 +317,6 @@ mntattach(char *muxattach) c = bogus.chan; m = c->mux; - if(m == nil){ mntversion(c, nil, 0, 0); m = c->mux; @@ -342,7 +339,6 @@ mntattach(char *muxattach) mntfree(r); nexterror(); } - r->request.type = Tattach; r->request.fid = c->fid; if(bogus.authchan == nil) @@ -368,7 +364,7 @@ mntattach(char *muxattach) return c; } -Chan* +static Chan* mntchan(void) { Chan *c; @@ -552,13 +548,13 @@ mntclunk(Chan *c, int t) Mnt *m; Mntrpc *r; + cclunk(c); m = mntchk(c); r = mntralloc(c, m->msize); if(waserror()){ mntfree(r); nexterror(); } - r->request.type = t; r->request.fid = c->fid; mountrpc(m, r); @@ -658,24 +654,59 @@ mntwrite(Chan *c, void *buf, long n, vlong off) return mntrdwr(Twrite, c, buf, n, off); } -long +static void +mntcache(Mntrpc *r) +{ + ulong n, m; + vlong off; + Block *b; + Chan *c; + + c = r->c; + if((c->flag & CCACHE) == 0 || c->mcp == nil) + return; + off = r->request.offset; + switch(r->reply.type){ + case Rread: + m = r->reply.count; + if(m > r->request.count) + m = r->request.count; + + for(b = r->b; m > 0 && b != nil; b = b->next) { + n = BLEN(b); + if(m < n) + n = m; + cupdate(c, b->rp, n, off); + off += n; + } + break; + case Rwrite: + if(convM2S(r->rpc, r->rpclen, &r->request) == 0) + panic("convM2S"); + cwrite(c, (uchar*)r->request.data, r->request.count, off); + break; + } +} + +static long mntrdwr(int type, Chan *c, void *buf, long n, vlong off) { Mnt *m; Mntrpc *r; char *uba; - int cache; ulong cnt, nr, nreq; m = mntchk(c); uba = buf; cnt = 0; - cache = c->flag & CCACHE; - if(c->qid.type & QTDIR) - cache = 0; + for(;;) { - if(cache && type == Tread) { - nr = cread(c, (uchar*)uba, n, off); + nreq = n; + if(nreq > m->msize-IOHDRSZ) + nreq = m->msize-IOHDRSZ; + + if(type == Tread) { + nr = cread(c, (uchar*)uba, nreq, off); if(nr > 0) { nreq = nr; goto Next; @@ -691,43 +722,12 @@ mntrdwr(int type, Chan *c, void *buf, long n, vlong off) r->request.fid = c->fid; r->request.offset = off; r->request.data = uba; - nr = n; - if(nr > m->msize-IOHDRSZ) - nr = m->msize-IOHDRSZ; - r->request.count = nr; + r->request.count = nreq; mountrpc(m, r); - nreq = r->request.count; + mntcache(r); nr = r->reply.count; if(nr > nreq) nr = nreq; - - if(cache) { - /* - * note that we cannot update the cache from uba as - * the user could change its contents from another - * process before the data gets copied to the cached. - */ - if(type == Tread) { - ulong nc, nn; - Block *b; - - nc = 0; - for(b = r->b; b != nil; b = b->next) { - nn = BLEN(b); - if(nc+nn > nr) - nn = nr - nc; - cupdate(c, b->rp, nn, off + nc); - nc += nn; - if(nc >= nr) - break; - } - } else { - if(convM2S(r->rpc, r->rpclen, &r->request) == 0) - panic("convM2S"); - cwrite(c, (uchar*)r->request.data, nr, off); - } - } - if(type == Tread) r->b = bl2mem((uchar*)uba, r->b, nr); mntfree(r); @@ -744,7 +744,231 @@ mntrdwr(int type, Chan *c, void *buf, long n, vlong off) return cnt; } +static int +mntprocwork(void *a) +{ + Mntproc *p = a; + return p->f != nil; +} + +static void +mntproc(void *a) +{ + Mntproc *p = a; + Chan *c; + Mnt *m; + + while(waserror()) + ; + + m = p->m; + for(;;){ + tsleep(p, mntprocwork, p, 500); + + lock(m); + if(p->f == nil){ + p->m = nil; + unlock(m); + pexit("no work", 1); + } + c = p->r->c; + unlock(m); + + (*p->f)(p->r, p->a); + + lock(m); + p->r = nil; + p->a = nil; + p->f = nil; + unlock(m); + + cclose(c); + } +} + +static int +mntdefer(void (*f)(Mntrpc*, void*), Mntrpc *r, void *a) +{ + Mntproc *p; + Mnt *m; + int i; + + m = mntchk(r->c); + lock(m); + for(i = 0; i < nelem(m->defered); i++){ + p = &m->defered[i]; + if(p->f != nil) + continue; + + incref(r->c); + r->m = m; + p->r = r; + p->a = a; + p->f = f; + + if(p->m == nil){ + p->m = m; + unlock(m); + kproc("mntporc", mntproc, p); + } else { + unlock(m); + wakeup(p); + } + return 1; + } + unlock(m); + return 0; +} + +static void +rahproc(Mntrpc *r, void *a) +{ + Mntrah *rah = a; + + if(!waserror()){ + mountrpc(r->m, r); + poperror(); + } + r->done = 2; + wakeup(rah); +} + +static int +rahdone(void *v) +{ + Mntrpc *r = v; + return r->done == 2; +} + +static Mntrpc* +rahfindrpc(Mntrah *rah, vlong off) +{ + Mntrpc *r; + int i, n; + vlong o; + + for(i=0; i<nelem(rah->r); i++){ + if((r = rah->r[i]) == nil) + continue; + n = r->request.count; + o = r->request.offset; + if(off >= o && off < o+n) + return r; + } + return nil; +} + void +mntrahinit(Mntrah *rah) +{ + Mntrpc *r; + int i; + + while(waserror()) + ; + + for(i=0; i<nelem(rah->r); i++){ + if((r = rah->r[i]) != nil){ + while(!rahdone(r)) + sleep(rah, rahdone, r); + rah->r[i] = nil; + mntfree(r); + } + } + rah->i = 0; + + rah->off = 0; + rah->seq = 0; + + poperror(); +} + +long +mntrahread(Mntrah *rah, Chan *c, uchar *buf, long len, vlong off) +{ + Mntrpc *r, **rr; + vlong o, w, e; + long n, tot; + Mnt *m; + + if(len <= 0) + return 0; + if(off != rah->off){ + rah->off = off; + rah->seq = 0; + } + rah->off += len; + rah->seq += len; + if(rah->seq >= 2*c->iounit){ + w = (off / c->iounit) * c->iounit; + e = w + rah->seq; + for(o = w; w < e; o += c->iounit){ + if(rahfindrpc(rah, o) != nil) + continue; + + rr = &rah->r[rah->i % nelem(rah->r)]; + if((r = *rr) != nil){ + if(!rahdone(r) || (r->request.offset >= w && r->request.offset < e)) + break; + *rr = nil; + mntfree(r); + } + + m = mntchk(c); + r = mntralloc(c, m->msize); + r->request.type = Tread; + r->request.fid = c->fid; + r->request.offset = o; + r->request.count = c->iounit; + if(!mntdefer(rahproc, r, rah)){ + mntfree(r); + break; + } + *rr = r; + rah->i++; + } + } + + tot = 0; + while(len > 0 && (r = rahfindrpc(rah, off)) != nil){ + while(!rahdone(r)) + sleep(rah, rahdone, r); + + switch(r->reply.type){ + default: + error(Emountrpc); + case Rflush: + error(Eintr); + case Rerror: + error(r->reply.ename); + case Rread: + break; + } + mntcache(r); + n = r->request.count; + o = r->request.offset; + if(r->reply.count < n) + n = r->reply.count; + n -= (off - o); + if(n <= 0) + break; + if(len < n) + n = len; + n = readblist(r->b, buf, n, off - o); + buf += n; + off += n; + tot += n; + len -= n; + } + if(tot > 0){ + rah->off -= len; + rah->seq -= len; + } + + return tot; +} + +static void mountrpc(Mnt *m, Mntrpc *r) { char *sn, *cn; @@ -778,7 +1002,7 @@ mountrpc(Mnt *m, Mntrpc *r) } } -void +static void mountio(Mnt *m, Mntrpc *r) { int n; @@ -870,7 +1094,7 @@ doread(Mnt *m, int len) return 0; } -int +static int mntrpcread(Mnt *m, Mntrpc *r) { int i, t, len, hlen; @@ -942,7 +1166,7 @@ mntrpcread(Mnt *m, Mntrpc *r) return 0; } -void +static void mntgate(Mnt *m) { Mntrpc *q; @@ -957,7 +1181,7 @@ mntgate(Mnt *m) unlock(m); } -void +static void mountmux(Mnt *m, Mntrpc *r) { Mntrpc **l, *q; @@ -1003,7 +1227,7 @@ mountmux(Mnt *m, Mntrpc *r) * Create a new flush request and chain the previous * requests from it */ -Mntrpc* +static Mntrpc* mntflushalloc(Mntrpc *r, ulong iounit) { Mntrpc *fr; @@ -1027,7 +1251,7 @@ mntflushalloc(Mntrpc *r, ulong iounit) * and if it hasn't been answered set the reply to to * Rflush. Return the original rpc. */ -Mntrpc* +static Mntrpc* mntflushfree(Mnt *m, Mntrpc *r) { Mntrpc *fr; @@ -1046,7 +1270,7 @@ mntflushfree(Mnt *m, Mntrpc *r) return r; } -int +static int alloctag(void) { int i, j; @@ -1066,13 +1290,13 @@ alloctag(void) return NOTAG; } -void +static void freetag(int t) { mntalloc.tagmask[t>>TAGSHIFT] &= ~(1<<(t&TAGMASK)); } -Mntrpc* +static Mntrpc* mntralloc(Chan *c, ulong msize) { Mntrpc *new; @@ -1122,7 +1346,7 @@ mntralloc(Chan *c, ulong msize) return new; } -void +static void mntfree(Mntrpc *r) { if(r->b != nil) @@ -1142,7 +1366,7 @@ mntfree(Mntrpc *r) unlock(&mntalloc); } -void +static void mntqrm(Mnt *m, Mntrpc *r) { Mntrpc **l, *f; @@ -1161,7 +1385,7 @@ mntqrm(Mnt *m, Mntrpc *r) unlock(m); } -Mnt* +static Mnt* mntchk(Chan *c) { Mnt *m; @@ -1190,7 +1414,7 @@ mntchk(Chan *c) * reflect local values. These entries are known to be * the first two in the Dir encoding after the count. */ -void +static void mntdirfix(uchar *dirbuf, Chan *c) { uint r; @@ -1202,7 +1426,7 @@ mntdirfix(uchar *dirbuf, Chan *c) PBIT32(dirbuf, c->dev); } -int +static int rpcattn(void *v) { Mntrpc *r; diff --git a/sys/src/9/port/portdat.h b/sys/src/9/port/portdat.h index 106f016f8..16f24a10d 100644 --- a/sys/src/9/port/portdat.h +++ b/sys/src/9/port/portdat.h @@ -16,7 +16,9 @@ typedef struct Log Log; typedef struct Logflag Logflag; typedef struct Mntcache Mntcache; typedef struct Mount Mount; +typedef struct Mntrah Mntrah; typedef struct Mntrpc Mntrpc; +typedef struct Mntproc Mntproc; typedef struct Mnt Mnt; typedef struct Mhead Mhead; typedef struct Note Note; @@ -270,6 +272,29 @@ struct Mhead Mhead* hash; /* Hash chain */ }; +struct Mntrah +{ + Rendez; + + ulong vers; + + vlong off; + vlong seq; + + uint i; + Mntrpc *r[8]; +}; + +struct Mntproc +{ + Rendez; + + Mnt *m; + Mntrpc *r; + void *a; + void (*f)(Mntrpc*, void*); +}; + struct Mnt { Lock; @@ -277,6 +302,7 @@ struct Mnt Chan *c; /* Channel to file service */ Proc *rip; /* Reader in progress */ Mntrpc *queue; /* Queue of pending requests on this channel */ + Mntproc defered[8]; /* Worker processes for defered RPCs (read ahead) */ ulong id; /* Multiplexer id for channel check */ Mnt *list; /* Free list */ int flags; /* cache */ diff --git a/sys/src/9/port/portfns.h b/sys/src/9/port/portfns.h index 2f135c31b..db547d1a1 100644 --- a/sys/src/9/port/portfns.h +++ b/sys/src/9/port/portfns.h @@ -42,6 +42,7 @@ void confinit(void); int consactive(void); void (*consdebug)(void); void copen(Chan*); +void cclunk(Chan*); Block* concatblock(Block*); Block* copyblock(Block*, int); void copypage(Page*, Page*); @@ -283,6 +284,7 @@ int rand(void); void randominit(void); ulong randomread(void*, ulong); void rdb(void); +long readblist(Block *, uchar *, long, long); int readnum(ulong, char*, ulong, ulong, int); int readstr(ulong, char*, ulong, char*); void ready(Proc*); diff --git a/sys/src/9/port/qio.c b/sys/src/9/port/qio.c index d34178f46..9355ace11 100644 --- a/sys/src/9/port/qio.c +++ b/sys/src/9/port/qio.c @@ -906,6 +906,36 @@ qremove(Queue *q) /* * copy the contents of a string of blocks into + * memory from an offset. blocklist kept unchanged. + * return number of copied bytes. + */ +long +readblist(Block *b, uchar *p, long n, long o) +{ + long m, r; + + r = 0; + while(n > 0 && b != nil){ + m = BLEN(b); + if(o >= m) + o -= m; + else { + m -= o; + if(n < m) + m = n; + memmove(p, b->rp + o, m); + p += m; + r += m; + n -= m; + o = 0; + } + b = b->next; + } + return r; +} + +/* + * copy the contents of a string of blocks into * memory. emptied blocks are freed. return * pointer to first unconsumed block. */ |
