Author: [log in to unmask]
Date: Thu May 14 12:44:21 2015
New Revision: 2969
Log:
Update file crawler prior to testing at JLAB.
Added:
java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/JCacheManager.java
Modified:
java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EvioFileCrawler.java
java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EvioFileList.java
java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EvioFileUtilities.java
java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/RunLog.java
java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/RunProcessor.java
Modified: java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EvioFileCrawler.java
=============================================================================
--- java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EvioFileCrawler.java (original)
+++ java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EvioFileCrawler.java Thu May 14 12:44:21 2015
@@ -40,11 +40,11 @@
"timestamp file for date filtering; modified time will be set at end of job");
OPTIONS.addOption("d", "directory", true, "starting directory");
OPTIONS.addOption("r", "runs", true, "list of runs to accept (others will be excluded)");
- OPTIONS.addOption("c", "cache", false, "cache files to /cache/mss from MSS (only works at JLAB)");
OPTIONS.addOption("s", "summary", false, "print run summary at end of job");
OPTIONS.addOption("L", "log-level", true, "set log level (INFO, FINE, etc.)");
OPTIONS.addOption("u", "update", false, "update the run database");
OPTIONS.addOption("e", "epics", false, "process EPICS data");
+ OPTIONS.addOption("c", "cache", false, "cache all files from MSS");
}
public static void main(final String[] args) {
@@ -57,21 +57,21 @@
private final Set<Integer> acceptRuns = new HashSet<Integer>();
+ private boolean epics = false;
+
+ private final PosixParser parser = new PosixParser();
+
+ private boolean printSummary = false;
+
+ private File rootDir = new File(System.getProperty("user.dir"));
+
+ private Date timestamp = null;
+
+ private File timestampFile = null;
+
+ private boolean update = false;
+
private boolean cache = false;
-
- private boolean epics = false;
-
- private final PosixParser parser = new PosixParser();
-
- private boolean printSummary = false;
-
- private File rootDir = new File(System.getProperty("user.dir"));
-
- private Date timestamp = null;
-
- private File timestampFile = null;
-
- private boolean update = false;
private RunProcessor createRunProcessor(final RunSummary runSummary) {
final RunProcessor processor = new RunProcessor(runSummary);
@@ -134,20 +134,15 @@
if (cl.hasOption("u")) {
this.update = true;
}
-
+
+ if (cl.hasOption("e")) {
+ this.epics = true;
+ }
+
if (cl.hasOption("c")) {
this.cache = true;
}
- if (this.cache && (this.printSummary || this.update)) {
- // If file caching is selected, then printing run summary or updating the database won't work.
- throw new IllegalArgumentException("File caching cannot be activated with the -p or -u options.");
- }
-
- if (cl.hasOption("e")) {
- this.epics = true;
- }
-
} catch (final ParseException e) {
throw new RuntimeException("Error parsing options.", e);
}
@@ -155,16 +150,37 @@
return this;
}
+ private void cacheFiles(final RunLog runs) {
+ JCacheManager cache = new JCacheManager();
+
+ // Process all files in the runs.
+ for (final int run : runs.getSortedRunNumbers()) {
+
+ // Get the run summary for the run.
+ final RunSummary runSummary = runs.getRunSummary(run);
+
+ // Cache all the files.
+ cache.cache(runSummary.getFiles());
+
+ // Wait for cache operation to complete. (~5 minutes max)
+ boolean cached = cache.waitForAll(300000);
+
+ if (!cached) {
+ throw new RuntimeException("The cache operation did not complete in time.");
+ }
+ }
+ }
+
private void processRuns(final RunLog runs) throws Exception {
// Process all files in the runs.
for (final int run : runs.getSortedRunNumbers()) {
-
+
// Get the run summary for the run.
final RunSummary runSummary = runs.getRunSummary(run);
-
+
// Create a processor to process all the EVIO records in the run.
final RunProcessor processor = createRunProcessor(runSummary);
-
+
// Process the run, updating the run summary.
processor.process();
}
@@ -192,27 +208,27 @@
LOGGER.fine("sorting files by sequence ...");
runs.sortAllFiles();
-
+
+ // Cache all the files to disk before processing them.
if (this.cache) {
- // Cache files from MSS.
- runs.cache();
- } else {
-
- processRuns(runs);
-
- // Print the run summaries.
- if (this.printSummary) {
- runs.printRunSummaries();
- }
-
- // Insert run information into database.
- if (this.update) {
- // Update run log.
- runs.insert();
- }
- }
-
- // Update timestamp file.
+ cacheFiles(runs);
+ }
+
+ // Process all the files in the runs.
+ processRuns(runs);
+
+ // Print the run summaries.
+ if (this.printSummary) {
+ runs.printRunSummaries();
+ }
+
+ // Insert run information into the database.
+ if (this.update) {
+ // Update run log.
+ runs.insert();
+ }
+
+ // Update the timestamp file.
if (this.timestampFile == null) {
this.timestampFile = new File("timestamp");
try {
Modified: java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EvioFileList.java
=============================================================================
--- java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EvioFileList.java (original)
+++ java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EvioFileList.java Thu May 14 12:44:21 2015
@@ -22,32 +22,12 @@
Map<File, Integer> eventCounts = new HashMap<File, Integer>();
- void cache() {
- LOGGER.info("running cache commands ...");
- for (final File file : this) {
- EvioFileUtilities.cache(file);
- }
- LOGGER.info("done running cache commands");
- }
-
- void computeEventCount(final File file) {
+ void computeEventCount(EvioReader reader, final File file) throws IOException, EvioException {
+ if (!reader.getPath().equals(file.getPath())) {
+ throw new IllegalArgumentException("The EvioReader and file paths do not match.");
+ }
LOGGER.info("computing event count for " + file.getPath() + " ...");
- int eventCount = 0;
- EvioReader reader = null;
- try {
- reader = new EvioReader(file, false);
- eventCount += reader.getEventCount();
- } catch (EvioException | IOException e) {
- throw new RuntimeException(e);
- } finally {
- if (reader != null) {
- try {
- reader.close();
- } catch (final IOException e) {
- e.printStackTrace();
- }
- }
- }
+ int eventCount = reader.getEventCount();
this.eventCounts.put(file, eventCount);
LOGGER.info("done computing event count for " + file.getPath());
}
@@ -68,8 +48,8 @@
}
int getEventCount(final File file) {
- if (!this.eventCounts.containsKey(file)) {
- computeEventCount(file);
+ if (this.eventCounts.get(file) == null) {
+ throw new RuntimeException("The event count for " + file.getPath() + " was never computed.");
}
return this.eventCounts.get(file);
}
Modified: java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EvioFileUtilities.java
=============================================================================
--- java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EvioFileUtilities.java (original)
+++ java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EvioFileUtilities.java Thu May 14 12:44:21 2015
@@ -18,18 +18,6 @@
private static final Logger LOGGER = LogUtil.create(EvioFileUtilities.class);
private static final long MILLISECONDS = 1000L;
-
- static void cache(final File file) {
- if (!file.getPath().startsWith("/mss")) {
- throw new IllegalArgumentException("Only files on /mss can be cached.");
- }
- try {
- new ProcessBuilder("jcache", "submit", "default", file.getPath()).start();
- } catch (final IOException e) {
- throw new RuntimeException(e);
- }
- LOGGER.info("process started to cache " + file.getPath());
- }
static Date getControlDate(final File file, final int eventTag, final int gotoEvent) {
Date date = null;
@@ -149,10 +137,36 @@
}
static EvioReader open(final File file) throws IOException, EvioException {
+ File openFile = file;
+ if (isTapeFile(file)) {
+ openFile = getCachedFile(file);
+ }
final long start = System.currentTimeMillis();
- final EvioReader reader = new EvioReader(file, false, false);
+ final EvioReader reader = new EvioReader(openFile, false, false);
final long end = System.currentTimeMillis() - start;
- LOGGER.info("opened " + file.getPath() + " in " + end / MILLISECONDS + " seconds");
+ LOGGER.info("opened " + openFile.getPath() + " in " + end / MILLISECONDS + " seconds");
return reader;
}
+
+ static EvioReader open(final String path) throws IOException, EvioException {
+ return open(new File(path));
+ }
+
+ static File getCachedFile(File file) {
+ if (!isTapeFile(file)) {
+ throw new IllegalArgumentException("File " + file.getPath() + " is not on the JLab MSS.");
+ }
+ if (isCachedFile(file)) {
+ throw new IllegalArgumentException("File " + file.getPath() + " is already on the cache disk.");
+ }
+ return new File("/cache" + file.getPath());
+ }
+
+ static boolean isTapeFile(File file) {
+ return file.getPath().startsWith("/mss");
+ }
+
+ static boolean isCachedFile(File file) {
+ return file.getPath().startsWith("/cache");
+ }
}
Added: java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/JCacheManager.java
=============================================================================
--- java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/JCacheManager.java (added)
+++ java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/JCacheManager.java Thu May 14 12:44:21 2015
@@ -0,0 +1,149 @@
+package org.hps.record.evio.crawler;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.StringReader;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.jdom.Document;
+import org.jdom.Element;
+import org.jdom.input.SAXBuilder;
+import org.xml.sax.InputSource;
+
+// sample path => /mss/hallb/hps/data/cosmic_002713.evio.0
+public class JCacheManager {
+
+ Map<File, FileInfo> fileInfos = new HashMap<File, FileInfo>();
+
+ static class FileInfo {
+
+ private Integer requestId = null;
+ private File file = null;
+
+ FileInfo(File file, Integer requestId) {
+ this.requestId = requestId;
+ }
+
+ File getCachedFile() {
+ return new File("/cache" + file.getPath());
+ }
+
+ Integer getRequestId() {
+ return requestId;
+ }
+
+ String getStatus(File file) {
+ Process process = null;
+ try {
+ process = new ProcessBuilder("jcache", "request", requestId.toString()).start();
+ } catch (final IOException e) {
+ throw new RuntimeException(e);
+ }
+ String xmlString = null;
+ try {
+ xmlString = readFully(process.getInputStream(), "US-ASCII");
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ Document xml = buildDocument(xmlString);
+ Element root = xml.getRootElement();
+ String status = root.getChild("request").getChildText("status");
+ return status;
+ }
+
+ }
+
+ void cache(List<File> files) {
+ for (File file : files) {
+ cache(file);
+ }
+ }
+
+ void cache(File file) {
+ // LOGGER.info("running cache commands ...");
+ if (!EvioFileUtilities.isCachedFile(file)) {
+ throw new IllegalArgumentException("Only files on /mss can be cached.");
+ }
+ Process process = null;
+ try {
+ process = new ProcessBuilder("jcache", "submit", "default", file.getPath()).start();
+ } catch (final IOException e) {
+ throw new RuntimeException(e);
+ }
+ String output = null;
+ try {
+ output = readFully(process.getInputStream(), "US-ASCII");
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ Integer requestId = Integer.parseInt(output.substring(output.indexOf("'") + 1, output.lastIndexOf("'")));
+ FileInfo fileInfo = new FileInfo(file, requestId);
+ fileInfos.put(file, fileInfo);
+ }
+
+ // <?xml version="1.0"?><jcache><request
+ // id="5123929"><user>jeremym</user><family>default</family><status>active</status><file
+ // id="1"><path>/cache/mss/hallb/hps/production/slic/tritrig/2pt2/tritrigv1_s2d6_10.slcio</path><status>pending</status></file></request></jcache>
+
+ private static Document buildDocument(String xmlString) {
+ SAXBuilder builder = new SAXBuilder();
+ Document document = null;
+ try {
+ builder.build(new InputSource(new StringReader(xmlString)));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return document;
+ }
+
+ boolean waitForAll(int maxWaitMillis) {
+ boolean allCached = false;
+ long elapsed = 0;
+ while (!allCached) {
+ boolean cacheCheck = true;
+ for (Entry<File, FileInfo> entry : fileInfos.entrySet()) {
+ // TODO: Should also check the status here.
+ if (!entry.getValue().getCachedFile().exists()) {
+ cacheCheck = false;
+ break;
+ }
+ }
+ if (cacheCheck) {
+ allCached = true;
+ break;
+ }
+ elapsed = System.currentTimeMillis();
+ if (elapsed > maxWaitMillis) {
+ break;
+ }
+ Object lock = new Object();
+ synchronized(lock) {
+ try {
+ lock.wait(5000); // Wait 5 seconds before re-polling the files.
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ return allCached;
+ }
+
+ private static String readFully(InputStream inputStream, String encoding) throws IOException {
+ return new String(readFully(inputStream), encoding);
+ }
+
+ private static byte[] readFully(InputStream inputStream) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ byte[] buffer = new byte[1024];
+ int length = 0;
+ while ((length = inputStream.read(buffer)) != -1) {
+ baos.write(buffer, 0, length);
+ }
+ return baos.toByteArray();
+ }
+}
Modified: java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/RunLog.java
=============================================================================
--- java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/RunLog.java (original)
+++ java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/RunLog.java Thu May 14 12:44:21 2015
@@ -20,11 +20,11 @@
Map<Integer, RunSummary> runs = new HashMap<Integer, RunSummary>();
- void cache() {
- for (final int run : getSortedRunNumbers()) {
- this.runs.get(run).getFiles().cache();
- }
- }
+ // void cache() {
+ // for (final int run : getSortedRunNumbers()) {
+ // this.runs.get(run).getFiles().cache();
+ // }
+ // }
public RunSummary getRunSummary(final int run) {
if (!this.runs.containsKey(run)) {
Modified: java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/RunProcessor.java
=============================================================================
--- java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/RunProcessor.java (original)
+++ java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/RunProcessor.java Thu May 14 12:44:21 2015
@@ -37,12 +37,18 @@
if (this.processors.isEmpty()) {
throw new RuntimeException("The processors list is empty.");
}
+
+ // Run the start of job hooks.
for (final EvioEventProcessor processor : this.processors) {
processor.startJob();
}
+
+ // Process the files.
for (final File file : this.runSummary.getFiles()) {
process(file);
}
+
+ // Run the end of job hooks.
for (final EvioEventProcessor processor : this.processors) {
processor.endJob();
}
@@ -51,8 +57,13 @@
private void process(final File file) throws EvioException, IOException, Exception {
EvioReader reader = null;
try {
+ // Open with wrapper method which will use the cached file path if necessary.
reader = EvioFileUtilities.open(file);
- this.runSummary.getFiles().computeEventCount(file);
+
+ // Compute event count for the file and store the value.
+ this.runSummary.getFiles().computeEventCount(reader, file);
+
+ // Process the events using the list of EVIO processors.
EvioEvent event = null;
while ((event = reader.parseNextEvent()) != null) {
for (final EvioEventProcessor processor : this.processors) {
@@ -65,4 +76,5 @@
}
}
}
+
}
|