LISTSERV mailing list manager LISTSERV 16.5

Help for HPS-SVN Archives


HPS-SVN Archives

HPS-SVN Archives


HPS-SVN@LISTSERV.SLAC.STANFORD.EDU


View:

Message:

[

First

|

Previous

|

Next

|

Last

]

By Topic:

[

First

|

Previous

|

Next

|

Last

]

By Author:

[

First

|

Previous

|

Next

|

Last

]

Font:

Proportional Font

LISTSERV Archives

LISTSERV Archives

HPS-SVN Home

HPS-SVN Home

HPS-SVN  May 2015

HPS-SVN May 2015

Subject:

r2969 - in /java/trunk/record-util/src/main/java/org/hps/record/evio/crawler: EvioFileCrawler.java EvioFileList.java EvioFileUtilities.java JCacheManager.java RunLog.java RunProcessor.java

From:

[log in to unmask]

Reply-To:

Notification of commits to the hps svn repository <[log in to unmask]>

Date:

Thu, 14 May 2015 19:44:30 -0000

Content-Type:

text/plain

Parts/Attachments:

Parts/Attachments

text/plain (524 lines)

Author: [log in to unmask]
Date: Thu May 14 12:44:21 2015
New Revision: 2969

Log:
Update file crawler prior to testing at JLAB.

Added:
    java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/JCacheManager.java
Modified:
    java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EvioFileCrawler.java
    java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EvioFileList.java
    java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EvioFileUtilities.java
    java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/RunLog.java
    java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/RunProcessor.java

Modified: java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EvioFileCrawler.java
 =============================================================================
--- java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EvioFileCrawler.java	(original)
+++ java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EvioFileCrawler.java	Thu May 14 12:44:21 2015
@@ -40,11 +40,11 @@
                 "timestamp file for date filtering; modified time will be set at end of job");
         OPTIONS.addOption("d", "directory", true, "starting directory");
         OPTIONS.addOption("r", "runs", true, "list of runs to accept (others will be excluded)");
-        OPTIONS.addOption("c", "cache", false, "cache files to /cache/mss from MSS (only works at JLAB)");
         OPTIONS.addOption("s", "summary", false, "print run summary at end of job");
         OPTIONS.addOption("L", "log-level", true, "set log level (INFO, FINE, etc.)");
         OPTIONS.addOption("u", "update", false, "update the run database");
         OPTIONS.addOption("e", "epics", false, "process EPICS data");
+        OPTIONS.addOption("c", "cache", false, "cache all files from MSS");
     }
 
     public static void main(final String[] args) {
@@ -57,21 +57,21 @@
 
     private final Set<Integer> acceptRuns = new HashSet<Integer>();
 
+    private boolean epics = false;
+
+    private final PosixParser parser = new PosixParser();
+
+    private boolean printSummary = false;
+
+    private File rootDir = new File(System.getProperty("user.dir"));
+
+    private Date timestamp = null;
+
+    private File timestampFile = null;
+
+    private boolean update = false;
+    
     private boolean cache = false;
-
-    private boolean epics = false;
-
-    private final PosixParser parser = new PosixParser();
-
-    private boolean printSummary = false;
-
-    private File rootDir = new File(System.getProperty("user.dir"));
-
-    private Date timestamp = null;
-
-    private File timestampFile = null;
-
-    private boolean update = false;
 
     private RunProcessor createRunProcessor(final RunSummary runSummary) {
         final RunProcessor processor = new RunProcessor(runSummary);
@@ -134,20 +134,15 @@
             if (cl.hasOption("u")) {
                 this.update = true;
             }
-
+           
+            if (cl.hasOption("e")) {
+                this.epics = true;
+            }
+            
             if (cl.hasOption("c")) {
                 this.cache = true;
             }
 
-            if (this.cache && (this.printSummary || this.update)) {
-                // If file caching is selected, then printing run summary or updating the database won't work.
-                throw new IllegalArgumentException("File caching cannot be activated with the -p or -u options.");
-            }
-
-            if (cl.hasOption("e")) {
-                this.epics = true;
-            }
-
         } catch (final ParseException e) {
             throw new RuntimeException("Error parsing options.", e);
         }
@@ -155,16 +150,37 @@
         return this;
     }
 
+    private void cacheFiles(final RunLog runs) {
+        JCacheManager cache = new JCacheManager();
+        
+        // Process all files in the runs.
+        for (final int run : runs.getSortedRunNumbers()) {
+                        
+            // Get the run summary for the run.
+            final RunSummary runSummary = runs.getRunSummary(run);
+            
+            // Cache all the files.
+            cache.cache(runSummary.getFiles());
+                        
+            // Wait for cache operation to complete. (~5 minutes max)
+            boolean cached = cache.waitForAll(300000);
+            
+            if (!cached) {
+                throw new RuntimeException("The cache operation did not complete in time.");
+            }
+        }
+    }
+    
     private void processRuns(final RunLog runs) throws Exception {
         // Process all files in the runs.
         for (final int run : runs.getSortedRunNumbers()) {
-
+                        
             // Get the run summary for the run.
             final RunSummary runSummary = runs.getRunSummary(run);
-
+                        
             // Create a processor to process all the EVIO records in the run.
             final RunProcessor processor = createRunProcessor(runSummary);
-
+                                   
             // Process the run, updating the run summary.
             processor.process();
         }
@@ -192,27 +208,27 @@
 
         LOGGER.fine("sorting files by sequence ...");
         runs.sortAllFiles();
-
+        
+        // Cache all the files to disk before processing them.
         if (this.cache) {
-            // Cache files from MSS.
-            runs.cache();
-        } else {
-
-            processRuns(runs);
-
-            // Print the run summaries.
-            if (this.printSummary) {
-                runs.printRunSummaries();
-            }
-
-            // Insert run information into database.
-            if (this.update) {
-                // Update run log.
-                runs.insert();
-            }
-        }
-
-        // Update timestamp file.
+            cacheFiles(runs);
+        }
+
+        // Process all the files in the runs.
+        processRuns(runs);
+
+        // Print the run summaries.
+        if (this.printSummary) {
+            runs.printRunSummaries();
+        }
+
+        // Insert run information into the database.
+        if (this.update) {
+            // Update run log.
+            runs.insert();
+        }
+
+        // Update the timestamp file.
         if (this.timestampFile == null) {
             this.timestampFile = new File("timestamp");
             try {

Modified: java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EvioFileList.java
 =============================================================================
--- java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EvioFileList.java	(original)
+++ java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EvioFileList.java	Thu May 14 12:44:21 2015
@@ -22,32 +22,12 @@
 
     Map<File, Integer> eventCounts = new HashMap<File, Integer>();
 
-    void cache() {
-        LOGGER.info("running cache commands ...");
-        for (final File file : this) {
-            EvioFileUtilities.cache(file);
-        }
-        LOGGER.info("done running cache commands");
-    }
-
-    void computeEventCount(final File file) {
+    void computeEventCount(EvioReader reader, final File file) throws IOException, EvioException {
+        if (!reader.getPath().equals(file.getPath())) {
+            throw new IllegalArgumentException("The EvioReader and file paths do not match.");
+        }        
         LOGGER.info("computing event count for " + file.getPath() + " ...");
-        int eventCount = 0;
-        EvioReader reader = null;
-        try {
-            reader = new EvioReader(file, false);
-            eventCount += reader.getEventCount();
-        } catch (EvioException | IOException e) {
-            throw new RuntimeException(e);
-        } finally {
-            if (reader != null) {
-                try {
-                    reader.close();
-                } catch (final IOException e) {
-                    e.printStackTrace();
-                }
-            }
-        }
+        int eventCount = reader.getEventCount();
         this.eventCounts.put(file, eventCount);
         LOGGER.info("done computing event count for " + file.getPath());
     }
@@ -68,8 +48,8 @@
     }
 
     int getEventCount(final File file) {
-        if (!this.eventCounts.containsKey(file)) {
-            computeEventCount(file);
+        if (this.eventCounts.get(file) == null) {
+            throw new RuntimeException("The event count for " + file.getPath() + " was never computed.");
         }
         return this.eventCounts.get(file);
     }

Modified: java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EvioFileUtilities.java
 =============================================================================
--- java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EvioFileUtilities.java	(original)
+++ java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EvioFileUtilities.java	Thu May 14 12:44:21 2015
@@ -18,18 +18,6 @@
     private static final Logger LOGGER = LogUtil.create(EvioFileUtilities.class);
 
     private static final long MILLISECONDS = 1000L;
-
-    static void cache(final File file) {
-        if (!file.getPath().startsWith("/mss")) {
-            throw new IllegalArgumentException("Only files on /mss can be cached.");
-        }
-        try {
-            new ProcessBuilder("jcache", "submit", "default", file.getPath()).start();
-        } catch (final IOException e) {
-            throw new RuntimeException(e);
-        }
-        LOGGER.info("process started to cache " + file.getPath());
-    }
 
     static Date getControlDate(final File file, final int eventTag, final int gotoEvent) {
         Date date = null;
@@ -149,10 +137,36 @@
     }
 
     static EvioReader open(final File file) throws IOException, EvioException {
+        File openFile = file;
+        if (isTapeFile(file)) {
+            openFile = getCachedFile(file);
+        }        
         final long start = System.currentTimeMillis();
-        final EvioReader reader = new EvioReader(file, false, false);
+        final EvioReader reader = new EvioReader(openFile, false, false);
         final long end = System.currentTimeMillis() - start;
-        LOGGER.info("opened " + file.getPath() + " in " + end / MILLISECONDS + " seconds");
+        LOGGER.info("opened " + openFile.getPath() + " in " + end / MILLISECONDS + " seconds");
         return reader;
     }
+    
+    static EvioReader open(final String path) throws IOException, EvioException {
+        return open(new File(path));
+    }    
+    
+    static File getCachedFile(File file) {
+        if (!isTapeFile(file)) {
+            throw new IllegalArgumentException("File " + file.getPath() + " is not on the JLab MSS.");
+        }
+        if (isCachedFile(file)) {
+            throw new IllegalArgumentException("File " + file.getPath() + " is already on the cache disk.");
+        }
+        return new File("/cache" + file.getPath());
+    }
+    
+    static boolean isTapeFile(File file) {
+        return file.getPath().startsWith("/mss");
+    }
+    
+    static boolean isCachedFile(File file) {
+        return file.getPath().startsWith("/cache");
+    }
 }

Added: java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/JCacheManager.java
 =============================================================================
--- java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/JCacheManager.java	(added)
+++ java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/JCacheManager.java	Thu May 14 12:44:21 2015
@@ -0,0 +1,149 @@
+package org.hps.record.evio.crawler;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.StringReader;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.jdom.Document;
+import org.jdom.Element;
+import org.jdom.input.SAXBuilder;
+import org.xml.sax.InputSource;
+
+// sample path => /mss/hallb/hps/data/cosmic_002713.evio.0
+public class JCacheManager {
+
+    Map<File, FileInfo> fileInfos = new HashMap<File, FileInfo>();
+
+    static class FileInfo {
+
+        private Integer requestId = null;
+        private File file = null;
+
+        FileInfo(File file, Integer requestId) {
+            this.requestId = requestId;
+        }
+
+        File getCachedFile() {
+            return new File("/cache" + file.getPath());
+        }
+
+        Integer getRequestId() {
+            return requestId;
+        }
+
+        String getStatus(File file) {
+            Process process = null;
+            try {
+                process = new ProcessBuilder("jcache", "request", requestId.toString()).start();
+            } catch (final IOException e) {
+                throw new RuntimeException(e);
+            }
+            String xmlString = null;
+            try {
+                xmlString = readFully(process.getInputStream(), "US-ASCII");
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+            Document xml = buildDocument(xmlString);
+            Element root = xml.getRootElement();
+            String status = root.getChild("request").getChildText("status");
+            return status;
+        }
+
+    }
+    
+    void cache(List<File> files) {
+        for (File file : files) {
+            cache(file);
+        }
+    }   
+
+    void cache(File file) {
+        // LOGGER.info("running cache commands ...");
+        if (!EvioFileUtilities.isCachedFile(file)) {
+            throw new IllegalArgumentException("Only files on /mss can be cached.");
+        }
+        Process process = null;
+        try {
+            process = new ProcessBuilder("jcache", "submit", "default", file.getPath()).start();
+        } catch (final IOException e) {
+            throw new RuntimeException(e);
+        }
+        String output = null;
+        try {
+            output = readFully(process.getInputStream(), "US-ASCII");
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        Integer requestId = Integer.parseInt(output.substring(output.indexOf("'") + 1, output.lastIndexOf("'")));
+        FileInfo fileInfo = new FileInfo(file, requestId);
+        fileInfos.put(file, fileInfo);
+    }
+
+    // <?xml version="1.0"?><jcache><request
+    // id="5123929"><user>jeremym</user><family>default</family><status>active</status><file
+    // id="1"><path>/cache/mss/hallb/hps/production/slic/tritrig/2pt2/tritrigv1_s2d6_10.slcio</path><status>pending</status></file></request></jcache>
+
+    private static Document buildDocument(String xmlString) {
+        SAXBuilder builder = new SAXBuilder();
+        Document document = null;
+        try {
+            builder.build(new InputSource(new StringReader(xmlString)));
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        return document;
+    }
+
+    boolean waitForAll(int maxWaitMillis) {
+        boolean allCached = false;
+        long elapsed = 0;
+        while (!allCached) {
+            boolean cacheCheck = true;
+            for (Entry<File, FileInfo> entry : fileInfos.entrySet()) {
+                // TODO: Should also check the status here.
+                if (!entry.getValue().getCachedFile().exists()) {
+                    cacheCheck = false;
+                    break;
+                }
+            }
+            if (cacheCheck) {
+                allCached = true;
+                break;
+            }
+            elapsed = System.currentTimeMillis();
+            if (elapsed > maxWaitMillis) {
+                break;
+            }
+            Object lock = new Object();
+            synchronized(lock) {
+                try {
+                    lock.wait(5000); // Wait 5 seconds before re-polling the files.
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+        return allCached;
+    }
+
+    private static String readFully(InputStream inputStream, String encoding) throws IOException {
+        return new String(readFully(inputStream), encoding);
+    }
+
+    private static byte[] readFully(InputStream inputStream) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        byte[] buffer = new byte[1024];
+        int length = 0;
+        while ((length = inputStream.read(buffer)) != -1) {
+            baos.write(buffer, 0, length);
+        }
+        return baos.toByteArray();
+    }
+}

Modified: java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/RunLog.java
 =============================================================================
--- java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/RunLog.java	(original)
+++ java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/RunLog.java	Thu May 14 12:44:21 2015
@@ -20,11 +20,11 @@
 
     Map<Integer, RunSummary> runs = new HashMap<Integer, RunSummary>();
 
-    void cache() {
-        for (final int run : getSortedRunNumbers()) {
-            this.runs.get(run).getFiles().cache();
-        }
-    }
+    // void cache() {
+    // for (final int run : getSortedRunNumbers()) {
+    // this.runs.get(run).getFiles().cache();
+    // }
+    // }
 
     public RunSummary getRunSummary(final int run) {
         if (!this.runs.containsKey(run)) {

Modified: java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/RunProcessor.java
 =============================================================================
--- java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/RunProcessor.java	(original)
+++ java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/RunProcessor.java	Thu May 14 12:44:21 2015
@@ -37,12 +37,18 @@
         if (this.processors.isEmpty()) {
             throw new RuntimeException("The processors list is empty.");
         }
+        
+        // Run the start of job hooks.
         for (final EvioEventProcessor processor : this.processors) {
             processor.startJob();
         }
+        
+        // Process the files.
         for (final File file : this.runSummary.getFiles()) {
             process(file);
         }
+        
+        // Run the end of job hooks.
         for (final EvioEventProcessor processor : this.processors) {
             processor.endJob();
         }
@@ -51,8 +57,13 @@
     private void process(final File file) throws EvioException, IOException, Exception {
         EvioReader reader = null;
         try {
+            // Open with wrapper method which will use the cached file path if necessary.
             reader = EvioFileUtilities.open(file);
-            this.runSummary.getFiles().computeEventCount(file);
+            
+            // Compute event count for the file and store the value.
+            this.runSummary.getFiles().computeEventCount(reader, file);
+            
+            // Process the events using the list of EVIO processors.
             EvioEvent event = null;
             while ((event = reader.parseNextEvent()) != null) {
                 for (final EvioEventProcessor processor : this.processors) {
@@ -65,4 +76,5 @@
             }
         }
     }
+   
 }

Top of Message | Previous Page | Permalink

Advanced Options


Options

Log In

Log In

Get Password

Get Password


Search Archives

Search Archives


Subscribe or Unsubscribe

Subscribe or Unsubscribe


Archives

November 2017
August 2017
July 2017
January 2017
December 2016
November 2016
October 2016
September 2016
August 2016
July 2016
June 2016
May 2016
April 2016
March 2016
February 2016
January 2016
December 2015
November 2015
October 2015
September 2015
August 2015
July 2015
June 2015
May 2015
April 2015
March 2015
February 2015
January 2015
December 2014
November 2014
October 2014
September 2014
August 2014
July 2014
June 2014
May 2014
April 2014
March 2014
February 2014
January 2014
December 2013
November 2013

ATOM RSS1 RSS2



LISTSERV.SLAC.STANFORD.EDU

Secured by F-Secure Anti-Virus CataList Email List Search Powered by the LISTSERV Email List Manager

Privacy Notice, Security Notice and Terms of Use