@global mksctlmux ; { @defloc mkmux(fd) { @local ctls, taghandler, nexttag, freetags, aborted, verbose; @record muxrec { /* user interface */ abort, run, /* ctl interface */ bindctl, unbindctl, msgsync, /* msgsync(op, body) */ msgasync, /* msgasync(op, body, handler) */ }; verbose = 0; nexttag = 0; freetags = [ ]; ctls = [:]; taghandler = [:]; @defloc abort() { close(fd); aborted = 1; return nil; } @defloc bindctl(ctl) { if(ctls[ctl.id()]) error("ctl %d is already bound to mux", ctl.id()); ctls[ctl.id()] = ctl; } @defloc unbindctl(ctl) { if(ctls[ctl.id()] == nil) error("ctl %d is not bound to mux", ctl.id()); tabdelete(ctls, ctl.id()); } @defloc freshtag() { if(isempty(freetags)) return nexttag++; return pop(freetags); } @defloc reusetag(tag) { push(freetags, tag); } @defloc log(arg ...) { if(verbose){ apply(printf, arg); printf("\n"); } } @defloc xread(fd, sz) { @local p, e; p = fread(fd, sz); if(p == nil){ e = errno(); if(e[0]) error("ctlmux: recvmsg: %d %s", getpid(), e[1]); return nil; } return p; } @defloc recvmsg(fd) { @local p, sz, rep, dat; p = xread(fd, sizeof(litdom`uint64)); if(p == nil) return [nil, 'closed]; p = (uint64*)p; sz = *p++; p = xread(fd, sz); if(p == nil) error("ctlmux: remote sctl hung up"); rep = (sctl`Msg*)p; dat = getbytes(rep+1, sz-sizeof(sctl`Msg)); return [rep, dat]; } @defloc sendmsg(fd, arg ...) { @local p, sz, e; p = arg[0]; switch(length(arg)){ case 1: sz = sizeof(*p); break; case 2: sz = arg[1]; break; default: error("bad sendmsg"); } // if(verbose) // printf("(send) %s\n", fmtsctlmsg(p, sz)); write(fd, cval2str((uint64)sz)); e = errno(); if(e[0] != 0) error("sctl sendmsg: %s", e[1]); write(fd, getbytes(p, sz)); e = errno(); if(e[0] != 0) error("sctl sendmsg: %s", e[1]); } @defloc dispatch(rep, dat) { @local p, id, ctl, tag, h; if(rep->op == sctl`Aevent){ p = (uint64*)dat; id = *p++; ctl = ctls[id]; if(ctl == nil) log("received event for unknown ctl %d", id); else ctl.event(rep, dat); }else{ tag = rep->tag; h = taghandler[tag]; tabdelete(taghandler, tag); reusetag(tag); if(h == nil) log("received reply for unknown tag %d", tag); else h(rep, dat); } } @defloc noctls() { return length(ctls) == 0; } @defloc noinflight() { return length(taghandler) == 0; } @defloc runtil(pred) { @local rep, dat; if(aborted) return; if(pred()) return; if(noinflight() && noctls()) error("ctlmux: runtil: nothing to do"); [rep, dat] = recvmsg(fd); if(rep == nil) error("ctlmux: remote sctl hung up"); dispatch(rep, dat); runtil(pred); } @defloc msgasync(op, dat, h) { @local p, m, tag; tag = freshtag(); p = m = mkxs(); p = (sctl`Msg*)p; p->op = op; p->tag = tag; p++; p = (char*)p; putbytes(p, dat); p += length(dat); sendmsg(fd, m, (char*)p-m); taghandler[tag] = h; return nil; } @defloc runtiltag(fd, tag) { @local done, msg, dat; msg = nil; done = 0; taghandler[tag] = @lambda(m, d) { msg = m; dat = d; done = 1; }; runtil(@lambda(){ done; }); return [msg, dat]; } @defloc msgsync(op, dat) { @local p, m, tag; tag = freshtag(); p = m = mkxs(); p = (sctl`Msg*)p; p->op = op; p->tag = tag; p++; p = (char*)p; putbytes(p, dat); p += length(dat); sendmsg(fd, m, (char*)p-m); return runtiltag(fd, tag); } @defloc run(arg...) { @local pred; if(aborted) error("attempt to resume aborted mux"); foreach(@lambda(id, ctl) { ctl.cont(); }, ctls); if(length(arg)) pred = arg[0]; else pred = noctls; runtil(pred); return nil; } return muxrec(abort, run, bindctl, unbindctl, msgsync, msgasync); } @defloc localsctl() { @local fds, fd; fds = popen("sctl", "-c", "-g", 2|4|8); fd = fds[0]; finalize(fd, @lambda(fd) { close(fd); }); sctlversion(fd); return fd; } @define mksctlmux() { return mkmux(localsctl()); } }