Hello,
Hmmh, you are a brave man, doing this in java :) There are tons of hard
type castings and implied structures in the t and f streams.
I might be wrong but I suspect you are missing a loop over the multiple
records imbedded in every message you receive.
This is the old "reference" implementation in C++ ... messy like hell
(and as I see now, partially aligned with tabs :( --- you should
probably download it and re-format it to make it readable).
This is the MONSTER function that processes the streams:
https://github.com/osschar/gled-all/blob/master/libsets/XrdMon/Glasses/XrdMonSucker.cxx#L230
In particular, for f stream see:
https://github.com/osschar/gled-all/blob/master/libsets/XrdMon/Glasses/XrdMonSucker.cxx#L871-L894
note that this for goes on for a looong time.
t stream is even more complicated (also, you must process all u and d
map records to make heads and tails from it):
https://github.com/osschar/gled-all/blob/master/libsets/XrdMon/Glasses/XrdMonSucker.cxx#L624-L722
Again, the loop in the last referenced line goes on for a while. Here I
even implemented the inner struct local_cache to help with disentangling
of intermingled time-window and other records.
-----
There used to be a java implementation done for monalisa, it worked for
t stream, not sure what happened to it after about 2015 and if they've
added f stream. I can ask, I can not find anything on the webz.
Also, in OSG there is now a python based collector that should handle
both t and f streams. If this could work for you and you would be
willing to extend it (or generaize it) for your needs ... your
contributions would surely be appreciated.
Also also ... the Gled implementation (referenced above) is more or less
still viable ... but it is a big thing, built on top of ROOT (and can
dump detailed records into ROOT files for further analysis, including
file access patterns with vector reads). I have recently compiled it
with the latest ROOT on Fedora 39 (that is in the original SVN repo
somewhere else -- github is just a snapshot from some unknown point).
You should probably tell us what you are trying to do :) There is a
weekly mostly informal xcache/xrootd devops meeting on Thursday 1pm
central ... I can send you the details if you want to join.
Cheers,
Matevz
On 5/6/24 10:26, Christopher Larrieu wrote:
> Hi! Thanks for the response. Unfortunately, I can't link to the source
> repo because it is internal to jlab. However, below is the code (cut
> 'n' pasted). Please note that I do receive plenty of open events. I
> get map events and read/write events. Just no close events.
>
> Cheers,
>
> Chris
>
>
> package jlab.jasmine.xrootd;
>
> import org.apache.logging.log4j.LogManager;
> import org.apache.logging.log4j.Logger;
>
> import io.netty.bootstrap.Bootstrap;
> import io.netty.buffer.ByteBuf;
> import io.netty.channel.ChannelHandlerContext;
> import io.netty.channel.ChannelOption;
> import io.netty.channel.SimpleChannelInboundHandler;
> import io.netty.channel.nio.NioEventLoopGroup;
> import io.netty.channel.socket.DatagramPacket;
> import io.netty.channel.socket.nio.NioDatagramChannel;
> import jlab.scicomp.basic.error.ExceptionUtil;
>
> public class XrootdMonitor {
> private static final Logger logger = LogManager.getLogger();
> private static final int port = 33333;
>
> public static class DatagramHandler extends
> SimpleChannelInboundHandler<DatagramPacket> {
> @Override
> public void exceptionCaught(ChannelHandlerContext ctx,
> Throwable cause) {
> logger.warn(ExceptionUtil.distillErrorMessage(cause), cause);
> // ctx.close();
> }
>
> private static final int isClose = 0;
> private static final int isOpen = 1;
> private static final int isTime = 2;
> private static final int isXfr = 3;
> private static final int isDisc = 4;
>
> private static final int forced = 0x01;
> private static final int hasOPS = 0x02;
> private static final int hasSSQ = 0x04;
> private static final int hasLFN = 0x01;
> private static final int hasRW = 0x02;
> private static final int hasSID = 0x01;
>
> public static class ByteBufStruct {
> final ByteBuf bb;
>
> public ByteBufStruct(ByteBuf bb, int len) {
> this.bb = bb.readSlice(len);
> }
> }
>
> public static class XrdXrootdMonHeader extends ByteBufStruct {
> public XrdXrootdMonHeader(ByteBuf bb) {
> super(bb, 8);
> }
>
> public byte code() {
> return bb.getByte(0);
> }
>
> public byte pseq() {
> return bb.getByte(1);
> }
>
> public int plen() {
> return bb.getUnsignedShort(2);
> }
>
> public long stod() {
> return bb.getInt(4);
> }
>
> @Override
> public String toString() {
> return String.format("code=%c, pseq=%d, plen=%d,
> stod=%d", code(), pseq(), plen(), stod());
> }
> }
>
> public static class XrdXrootdMonFileHdr extends ByteBufStruct {
> public XrdXrootdMonFileHdr(ByteBuf bb) {
> super(bb, 8);
> }
>
> public int recType() {
> return bb.getByte(0);
> }
>
> public int recFlag() {
> return bb.getByte(1);
> }
>
> public short recSize() {
> return bb.getShort(2);
> }
>
> public long fileID() {
> return bb.getUnsignedInt(4);
> }
>
> public long userID() {
> return bb.getUnsignedInt(4);
> }
>
> public short[] nRecs() {
> return new short[] { bb.getShort(4), bb.getShort(6) };
> }
>
> @Override
> public String toString() {
> return String.format("recType=%d, recSize=%d,
> fileID=%d, userID=%d, nRecs=%d,%d",
> recType(), recSize(), fileID(), userID(),
> nRecs()[0], nRecs()[1]);
> }
> }
>
> public static class XrdXrootdMonFileRec extends ByteBufStruct {
> final XrdXrootdMonFileHdr hdr;
>
> public XrdXrootdMonFileRec(ByteBuf bb, int len) {
> this(new XrdXrootdMonFileHdr(bb), bb, len);
> }
>
> public XrdXrootdMonFileRec(XrdXrootdMonFileHdr hdr, ByteBuf
> bb, int len) {
> super(bb, len);
> this.hdr = hdr;
> }
>
> public XrdXrootdMonFileHdr hdr() {
> return hdr;
> }
> }
>
> public static class XrdXrootdMonFileTOD extends
> XrdXrootdMonFileRec {
> public XrdXrootdMonFileTOD(ByteBuf bb) {
> this(new XrdXrootdMonFileHdr(bb), bb);
> }
>
> public XrdXrootdMonFileTOD(XrdXrootdMonFileHdr hdr, ByteBuf
> bb) {
> super(hdr, bb, 16);
> }
>
> public int tBeg() {
> return bb.getInt(0);
> }
>
> public int tEnd() {
> return bb.getInt(4);
> }
>
> public long sID() {
> return bb.getLong(8) & XROOTD_MON_SIDMASK;
> }
>
> @Override
> public String toString() {
> return String.format("TOD tBeg=%d, tEnd=%d, sID=%d",
> tBeg(), tEnd(), sID());
> }
> }
>
> public static class XrdXrootdMonFileLFN extends ByteBufStruct {
> final String lfn;
>
> public XrdXrootdMonFileLFN(ByteBuf bb) {
> super(bb, 4);
> var sb = new StringBuilder();
> for (var c = bb.readByte(); c != 0; c = bb.readByte()) {
> sb.append((char) c);
> }
> lfn = sb.toString();
> }
>
> public long user() {
> return bb.getUnsignedInt(0);
> }
>
> public String lfn() {
> return lfn;
> }
>
> @Override
> public String toString() {
> return String.format("LFN user=%d, lfn=%s", user(), lfn());
> }
> }
>
> public static class XrdXrootdMonFileOPN extends
> XrdXrootdMonFileRec {
> final XrdXrootdMonFileLFN ufn;
>
> public XrdXrootdMonFileOPN(XrdXrootdMonFileHdr hdr, ByteBuf
> bb) {
> super(hdr, bb, 8);
> ufn = hasLFN() ? new XrdXrootdMonFileLFN(bb) : null;
> }
>
> public long fsz() {
> return bb.getLong(0);
> }
>
> public boolean hasLFN() {
> return (hdr.recFlag() & hasLFN) != 0;
> }
>
> public XrdXrootdMonFileLFN ufn() {
> return ufn;
> }
>
> @Override
> public String toString() {
> return String.format("OPN fsz=%d, hasLFN=%b, ufn=%s",
> fsz(), hasLFN(), ufn());
> }
> }
>
> public static class XrdXrootdMonFileCLS extends
> XrdXrootdMonFileRec {
>
> public XrdXrootdMonFileCLS(XrdXrootdMonFileHdr hdr, ByteBuf
> bb) {
> super(hdr, bb, 24);
> }
>
> public long read() {
> return bb.getLong(0);
> }
>
> public long readv() {
> return bb.getLong(8);
> }
>
> public long write() {
> return bb.getLong(16);
> }
>
> @Override
> public String toString() {
> return String.format("CLS read=%,d, readv=%d,
> write=%d", read(), readv(), write());
> }
> }
>
> public static class XrdXrootdMonMap extends ByteBufStruct {
> final String info;
>
> public XrdXrootdMonMap(XrdXrootdMonHeader hdr, ByteBuf bb) {
> super(bb, 4);
> var sb = new StringBuilder();
> for (var c = bb.readByte(); bb.readerIndex() <
> hdr.plen(); c = bb.readByte()) {
> if (c == '\n') {
> c = '|';
> }
> sb.append((char) c);
> }
> info = sb.toString();
> }
>
> public long dictid() {
> return bb.getUnsignedInt(0);
> }
>
> public String info() {
> return info;
> }
>
> @Override
> public String toString() {
> return String.format("dictid=%d, info=%s", dictid(),
> info());
> }
> }
>
> public static final int XROOTD_MON_OPEN = 0x80; // File has
> been opened
> public static final int XROOTD_MON_READV = 0x90; // Details
> for a kXR_readv request
> public static final int XROOTD_MON_READU = 0x91; // Unpacked
> details for kXR_readv
> public static final int XROOTD_MON_APPID = 0xa0; //
> Application provided marker
> public static final int XROOTD_MON_CLOSE = 0xc0; // File has
> been closed
> public static final int XROOTD_MON_DISC = 0xd0; // Client has
> disconnected
> public static final int XROOTD_MON_WINDOW = 0xe0; // Window
> timing mark
> public static final int XROOTD_MON_RW = 0x7f; // Read/Write
> public static final int XROOTD_MON_FORCED = 0x01;
> public static final int XROOTD_MON_BOUNDP = 0x02;
> public static final long XROOTD_MON_SIDMASK = 0x0000ffffffffffffL;
>
> public static class XrdXrootdMonTrace extends ByteBufStruct {
> final byte[] id;
> final short[] sVal;
> final long[] rTot;
>
> public XrdXrootdMonTrace(ByteBuf bb) {
> super(bb, 16);
> id = new byte[8];
> bb.getBytes(0, id);
> sVal = new short[] {
> bb.getShort(0), bb.getShort(2), bb.getShort(4),
> bb.getShort(6)
> };
> rTot = new long[] {
> bb.getUnsignedInt(0), bb.getUnsignedInt(4)
> };
> }
>
> public int type() {
> if ((int) id[0] <= XROOTD_MON_RW) {
> return XROOTD_MON_RW;
> }
> return (int) id[0];
> }
>
> public long arg0_val() {
> return bb.getLong(0);
> }
>
> public byte[] arg0_id() {
> return id;
> }
>
> public short[] arg0_sVal() {
> return sVal;
> }
>
> public long[] arg0_rTot() {
> return rTot;
> }
>
> public int arg1_buflen() {
> return bb.getInt(8);
> }
>
> // public long arg1_HostID() {
> // return bb.getUnsignedInt(8);
> // }
>
> public long arg1_wTot() {
> return bb.getUnsignedInt(8);
> }
>
> public int arg1_Window() {
> return bb.getInt(8);
> }
>
> public long arg2_dictid() {
> return bb.getUnsignedInt(12);
> }
>
> public int arg2_Window() {
> return bb.getInt(12);
> }
>
> @Override
> public String toString() {
> StringBuilder sb = new StringBuilder();
> for (int i = 0; i < id.length; ++i) {
> sb.append(String.format("%2x ", id[i]));
> }
> return String.format("arg0 = %s, arg1 = %d, arg2=%d",
> sb, arg1_wTot(), arg2_dictid());
> }
> }
> public static class XrdXrootdMonTraceRec {
> XrdXrootdMonTrace rec;
>
> public XrdXrootdMonTraceRec(XrdXrootdMonTrace rec) {
> this.rec = rec;
> }
> }
>
> public static long longFromBytes(byte[] bytes, int start, int
> len) {
> long n = 0;
> for (int i = 0; i < len; ++i) {
> n = (n << 8) | bytes[i];
> }
> return n;
> }
> public static class XrdXrootdMonTraceOPEN extends
> XrdXrootdMonTraceRec {
>
> public XrdXrootdMonTraceOPEN(XrdXrootdMonTrace rec) {
> super(rec);
> }
>
> public long fsz() {
> return longFromBytes(rec.arg0_id(), 1, 7);
> }
>
> public long dictid() {
> return rec.arg2_dictid();
> }
>
> @Override
> public String toString() {
> return String.format("OPEN fsz=%,d, dictid=%d", fsz(),
> dictid());
> }
> }
> public static class XrdXrootdMonTraceCLOSE extends
> XrdXrootdMonTraceRec {
>
> public XrdXrootdMonTraceCLOSE(XrdXrootdMonTrace rec) {
> super(rec);
> }
>
> public long dictid() {
> return rec.arg2_dictid();
> }
>
> public long rTot() {
> var n = longFromBytes(rec.arg0_id(), 1, 1);
> return rec.arg0_rTot()[1] << n;
> }
>
> public long wTot() {
> var n = longFromBytes(rec.arg0_id(), 2, 1);
> return rec.arg1_wTot() << n;
> }
>
> @Override
> public String toString() {
> return String.format("CLOSE dictid=%d, rTot=%,d,
> wTot=%,d", dictid(), rTot(), wTot());
> }
> }
> public static class XrdXrootdMonTraceRW extends
> XrdXrootdMonTraceRec {
>
> public XrdXrootdMonTraceRW(XrdXrootdMonTrace rec) {
> super(rec);
> }
>
> public int val() {
> return (int)rec.arg0_val();
> }
>
> public int bufflen() {
> return rec.arg1_buflen();
> }
>
> public int len() {
> return Math.abs(bufflen());
> }
>
> public char rw() {
> return bufflen() > 0 ? 'r' : 'w';
> }
>
> public long dictid() {
> return rec.arg2_dictid();
> }
>
> @Override
> public String toString() {
> return String.format(" rw=%c, val=%,d, len=%,d,
> dictid=%d", rw(), val(), len(), dictid());
> }
> }
>
> public static class XrdXrootdMonTraceAPPID extends
> XrdXrootdMonTraceRec {
> String appid;
>
> public XrdXrootdMonTraceAPPID(XrdXrootdMonTrace rec) {
> super(rec);
> }
>
> public String appid() {
> return new String(rec.arg0_id(), 4, 12);
> }
>
> @Override
> public String toString() {
> return "appid="+appid();
> }
> }
>
> public static class XrdXrootdMonTraceDISC extends
> XrdXrootdMonTraceRec {
> String appid;
>
> public XrdXrootdMonTraceDISC(XrdXrootdMonTrace rec) {
> super(rec);
> }
>
> public boolean boundp() {
> return (rec.arg0_id()[0] & XROOTD_MON_BOUNDP) != 0;
> }
>
> public boolean forced() {
> return (rec.arg0_id()[0] & XROOTD_MON_FORCED) != 0;
> }
>
> public int nsecs() {
> return rec.arg1_buflen();
> }
>
> public long dictid() {
> return rec.arg2_dictid();
> }
>
> @Override
> public String toString() {
> return String.format("bound=%b, forced=%b, nsecs=%,d,
> dictid=%d", boundp(), forced(), nsecs(), dictid());
> }
> }
>
> @Override
> protected void channelRead0(ChannelHandlerContext ctx,
> DatagramPacket dgram) throws Exception {
> // logger.info("dgram={}", dgram.toString());
> var bb = (ByteBuf) dgram.content();
>
> var hdr = new XrdXrootdMonHeader(bb);
> // logger.info("hdr = "+hdr);
>
> // let's make sure the buffer ends when the information ends
> bb.setIndex(bb.readerIndex(), hdr.plen());
>
> switch (hdr.code()) {
> case 'f': {
> while (bb.isReadable()) {
> var startPos = bb.readerIndex();
> var fhdr = new XrdXrootdMonFileHdr(bb);
> logger.info("fhdr = " + fhdr);
>
> XrdXrootdMonFileRec rec = switch (fhdr.recType()) {
> case isOpen -> new
> XrdXrootdMonFileOPN(fhdr, bb);
> case isTime -> new XrdXrootdMonFileTOD(bb);
> case isClose -> new
> XrdXrootdMonFileCLS(fhdr, bb);
> default -> null;
> };
> logger.info("rec = " + ((rec == null) ? "?" :
> rec));
>
> // Move on to next record
> bb.setIndex(startPos + fhdr.recSize(),
> bb.writerIndex());
> }
> break;
> }
> case '=':
> case 'd':
> case 'i':
> case 'p':
> case 'T':
> case 'u':
> case 'U':
> case 'x': {
> var map = new XrdXrootdMonMap(hdr, bb);
> logger.info(String.format("map(%c) = %s",
> hdr.code(), map));
> break;
> }
> case 't': {
> while (bb.isReadable() && bb.readerIndex() <
> hdr.plen()) {
> var rec = new XrdXrootdMonTrace(bb);
> var info = switch(rec.type()) {
> case XROOTD_MON_OPEN -> new
> XrdXrootdMonTraceOPEN(rec);
> case XROOTD_MON_CLOSE -> new
> XrdXrootdMonTraceCLOSE(rec);
> case XROOTD_MON_RW -> new
> XrdXrootdMonTraceRW(rec);
> case XROOTD_MON_APPID -> new
> XrdXrootdMonTraceAPPID(rec);
> case XROOTD_MON_DISC -> new
> XrdXrootdMonTraceDISC(rec);
> default -> rec;
> };
> logger.info("info = " + info);
> }
> break;
> }
>
> default: {
> logger.info("hdr = " + hdr);
> }
> }
> }
> }
>
> public static void run() throws Exception {
> var bossGroup = new NioEventLoopGroup();
> var b = new Bootstrap();
> b.group(bossGroup)
> .channel(NioDatagramChannel.class)
> .option(ChannelOption.SO_BROADCAST, true)
> .localAddress(port)
> .handler(new DatagramHandler());
>
> logger.info("bootstrap={}", b.toString());
>
> // Bind and start to accept incoming connections.
> final var f = b.bind().sync().channel();
> logger.info("channel={}", f.toString());
> }
>
> public static void main(String[] args) throws Exception {
> run();
> Thread.sleep(100000L);
> }
> }
> ------------------------------------------------------------------------
> *From:* Matevz Tadel <[log in to unmask]>
> *Sent:* Monday, May 6, 2024 1:18 PM
> *To:* Christopher Larrieu <[log in to unmask]>;
> [log in to unmask] <[log in to unmask]>
> *Subject:* [EXTERNAL] Re: Monitoring Data Stream
> Hi, there should be a close event (a different one for t and f stream,
> iirc, it's been a while :) ), unless your tests do something strange,
> like silently dropping the connection (some VMs and NATs do that).
>
> Can you link to your code in github or similar?
>
> Cheers,
> Matevz
>
> On 5/6/24 05:25, Christopher Larrieu wrote:
>> Hello,
>>
>> I started playing around with the xrootd monitoring system, wrote some
>> java code to receive and decode the UDP datagrams. But I notice I never
>> see any CLOSE events, either from the f-stream or t-stream. Is this normal?
>>
>> Thanks,
>>
>> Chris
>>
>> ------------------------------------------------------------------------
>>
>> Use REPLY-ALL to reply to list
>>
>> To unsubscribe from the XROOTD-L list, click the following link:
>> https://urldefense.proofpoint.com/v2/url?u=https-3A__listserv.slac.stanford.edu_cgi-2Dbin_wa-3FSUBED1-3DXROOTD-2DL-26A-3D1&d=DwIDaQ&c=CJqEzB1piLOyyvZjb8YUQw&r=NgCtsEGR5qbmr9ksstuq9q0xGVKyB2KaHJPp7JKtYmY&m=vTl-GIfZvGOiacP5dOfbDNYEFn38uSKBBK4GIaRMkveRRG8YNbKkor105utg5eyO&s=FTjQBWGR6iiKu7x5njeeD2JGJiXc9aKtFrtN9Bb1c78&e= <https://urldefense.proofpoint.com/v2/url?u=https-3A__listserv.slac.stanford.edu_cgi-2Dbin_wa-3FSUBED1-3DXROOTD-2DL-26A-3D1&d=DwIDaQ&c=CJqEzB1piLOyyvZjb8YUQw&r=NgCtsEGR5qbmr9ksstuq9q0xGVKyB2KaHJPp7JKtYmY&m=vTl-GIfZvGOiacP5dOfbDNYEFn38uSKBBK4GIaRMkveRRG8YNbKkor105utg5eyO&s=FTjQBWGR6iiKu7x5njeeD2JGJiXc9aKtFrtN9Bb1c78&e=>
>> <https://urldefense.com/v3/__https://listserv.slac.stanford.edu/cgi-bin/wa?SUBED1=XROOTD-L&A=1__;!!Mih3wA!GlwYaAdclcVQrf_46DH_YLcMQSe0RxhitY2SLBkn_ep-9dt_ph1e-194KQt-bLG9zf_UjQB2Ud17W2kY76q3$ <https://urldefense.com/v3/__https://listserv.slac.stanford.edu/cgi-bin/wa?SUBED1=XROOTD-L&A=1__;!!Mih3wA!GlwYaAdclcVQrf_46DH_YLcMQSe0RxhitY2SLBkn_ep-9dt_ph1e-194KQt-bLG9zf_UjQB2Ud17W2kY76q3$>>
>>
########################################################################
Use REPLY-ALL to reply to list
To unsubscribe from the XROOTD-L list, click the following link:
https://listserv.slac.stanford.edu/cgi-bin/wa?SUBED1=XROOTD-L&A=1
|