Author: [log in to unmask]
Date: Fri May 15 16:53:27 2015
New Revision: 2978
Log:
Add miscellaneous updates before testing at JLAB.
Modified:
java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EvioFileCrawler.java
java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EvioFileUtilities.java
java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/JCacheManager.java
Modified: java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EvioFileCrawler.java
=============================================================================
--- java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EvioFileCrawler.java (original)
+++ java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EvioFileCrawler.java Fri May 15 16:53:27 2015
@@ -45,6 +45,7 @@
OPTIONS.addOption("u", "update", false, "update the run database");
OPTIONS.addOption("e", "epics", false, "process EPICS data");
OPTIONS.addOption("c", "cache", false, "cache all files from MSS");
+ OPTIONS.addOption("w", "wait", true, "total wait time to allow for file caching operations");
}
public static void main(final String[] args) {
@@ -72,6 +73,8 @@
private boolean update = false;
private boolean cache = false;
+
+ private Long waitTime;
private RunProcessor createRunProcessor(final RunSummary runSummary) {
final RunProcessor processor = new RunProcessor(runSummary);
@@ -142,6 +145,10 @@
if (cl.hasOption("c")) {
this.cache = true;
}
+
+ if (cl.hasOption("w")) {
+ this.waitTime = Long.parseLong(cl.getOptionValue("w"));
+ }
} catch (final ParseException e) {
throw new RuntimeException("Error parsing options.", e);
@@ -151,29 +158,45 @@
}
private void cacheFiles(final RunLog runs) {
+
JCacheManager cache = new JCacheManager();
+
+ if (this.waitTime != null) {
+ LOGGER.config("set jcache wait time to " + waitTime + " millis");
+ cache.setWaitTime(waitTime);
+ }
// Process all files in the runs.
for (final int run : runs.getSortedRunNumbers()) {
+
+ LOGGER.info("processing run " + run + " files ...");
// Get the run summary for the run.
final RunSummary runSummary = runs.getRunSummary(run);
// Cache all the files.
+ LOGGER.info("caching " + runSummary.getFiles().size() + " files ...");
cache.cache(runSummary.getFiles());
- // Wait for cache operation to complete. (~5 minutes max)
- boolean cached = cache.waitForAll(300000);
+ // Wait for cache operation to complete.
+ boolean cached = cache.waitForAll();
+
+ LOGGER.info("files were cached: " + cached);
if (!cached) {
throw new RuntimeException("The cache operation did not complete in time.");
}
+
+ LOGGER.info("done caching run " + run);
}
}
private void processRuns(final RunLog runs) throws Exception {
+
// Process all files in the runs.
for (final int run : runs.getSortedRunNumbers()) {
+
+ LOGGER.info("processing run " + run + " ...");
// Get the run summary for the run.
final RunSummary runSummary = runs.getRunSummary(run);
@@ -183,6 +206,8 @@
// Process the run, updating the run summary.
processor.process();
+
+ LOGGER.info("done processing run " + run);
}
}
Modified: java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EvioFileUtilities.java
=============================================================================
--- java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EvioFileUtilities.java (original)
+++ java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/EvioFileUtilities.java Fri May 15 16:53:27 2015
@@ -138,7 +138,7 @@
static EvioReader open(final File file) throws IOException, EvioException {
File openFile = file;
- if (isTapeFile(file)) {
+ if (isMssFile(file)) {
openFile = getCachedFile(file);
}
final long start = System.currentTimeMillis();
@@ -153,7 +153,7 @@
}
static File getCachedFile(File file) {
- if (!isTapeFile(file)) {
+ if (!isMssFile(file)) {
throw new IllegalArgumentException("File " + file.getPath() + " is not on the JLab MSS.");
}
if (isCachedFile(file)) {
@@ -162,7 +162,7 @@
return new File("/cache" + file.getPath());
}
- static boolean isTapeFile(File file) {
+ static boolean isMssFile(File file) {
return file.getPath().startsWith("/mss");
}
Modified: java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/JCacheManager.java
=============================================================================
--- java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/JCacheManager.java (original)
+++ java/trunk/record-util/src/main/java/org/hps/record/evio/crawler/JCacheManager.java Fri May 15 16:53:27 2015
@@ -9,23 +9,39 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import org.jdom.Document;
import org.jdom.Element;
import org.jdom.input.SAXBuilder;
+import org.lcsim.util.log.DefaultLogFormatter;
+import org.lcsim.util.log.LogUtil;
import org.xml.sax.InputSource;
-// sample path => /mss/hallb/hps/data/cosmic_002713.evio.0
+/**
+ * Utility class for using the <i>jcache</i> command at JLAB.
+ */
public class JCacheManager {
- Map<File, FileInfo> fileInfos = new HashMap<File, FileInfo>();
-
+ private static Logger LOGGER = LogUtil.create(JCacheManager.class, new DefaultLogFormatter(), Level.ALL);
+
+ private Map<File, FileInfo> fileInfos = new HashMap<File, FileInfo>();
+
+ private static long DEFAULT_MAX_WAIT_TIME = 300000;
+
+ private long maxWaitTime = DEFAULT_MAX_WAIT_TIME;
+
+ private static final long POLL_WAIT_TIME = 10000;
+
static class FileInfo {
private Integer requestId = null;
private File file = null;
+ private boolean cached = false;
+ private Process process = null;
- FileInfo(File file, Integer requestId) {
+ FileInfo(File file, Integer requestId, Process process) {
this.requestId = requestId;
}
@@ -36,8 +52,25 @@
Integer getRequestId() {
return requestId;
}
+
+ Process getProcess() {
+ return process;
+ }
+
+ boolean isCached() {
+ return cached;
+ }
+
+ void update() {
+ if (!isCached()) {
+ if (!isPending() && getCachedFile().exists()) {
+ LOGGER.info("file " + file.getPath() + " is cached");
+ this.cached = true;
+ }
+ }
+ }
- String getStatus(File file) {
+ String getStatus() {
Process process = null;
try {
process = new ProcessBuilder("jcache", "request", requestId.toString()).start();
@@ -55,7 +88,14 @@
String status = root.getChild("request").getChildText("status");
return status;
}
-
+
+ boolean isPending() {
+ return !"pending".equals(getStatus());
+ }
+ }
+
+ void setWaitTime(long maxWaitTime) {
+ this.maxWaitTime = maxWaitTime;
}
void cache(List<File> files) {
@@ -65,9 +105,9 @@
}
void cache(File file) {
- // LOGGER.info("running cache commands ...");
- if (!EvioFileUtilities.isCachedFile(file)) {
- throw new IllegalArgumentException("Only files on /mss can be cached.");
+ if (!EvioFileUtilities.isMssFile(file)) {
+ LOGGER.severe("file " + file.getPath() + " is not on the MSS");
+ throw new IllegalArgumentException("Only files on the MSS can be cached.");
}
Process process = null;
try {
@@ -81,14 +121,11 @@
} catch (IOException e) {
throw new RuntimeException(e);
}
- Integer requestId = Integer.parseInt(output.substring(output.indexOf("'") + 1, output.lastIndexOf("'")));
- FileInfo fileInfo = new FileInfo(file, requestId);
+ Integer requestId = Integer.parseInt(output.substring(output.indexOf("'") + 1, output.lastIndexOf("'")));
+ FileInfo fileInfo = new FileInfo(file, requestId, process);
fileInfos.put(file, fileInfo);
+ LOGGER.info("jcache submitted for " + file.getPath() + " with req ID '" + requestId + "' and process " + process);
}
-
- // <?xml version="1.0"?><jcache><request
- // id="5123929"><user>jeremym</user><family>default</family><status>active</status><file
- // id="1"><path>/cache/mss/hallb/hps/production/slic/tritrig/2pt2/tritrigv1_s2d6_10.slcio</path><status>pending</status></file></request></jcache>
private static Document buildDocument(String xmlString) {
SAXBuilder builder = new SAXBuilder();
@@ -101,36 +138,46 @@
return document;
}
- boolean waitForAll(int maxWaitMillis) {
- boolean allCached = false;
+ boolean waitForAll() {
+ if (this.fileInfos.isEmpty()) {
+ throw new IllegalStateException("There are no files being cached.");
+ }
+ LOGGER.info("waiting for files to be cached ...");
long elapsed = 0;
- while (!allCached) {
- boolean cacheCheck = true;
- for (Entry<File, FileInfo> entry : fileInfos.entrySet()) {
- // TODO: Should also check the status here.
- if (!entry.getValue().getCachedFile().exists()) {
- cacheCheck = false;
- break;
+ boolean cached = false;
+ while (!cached) {
+ boolean check = true;
+ INFO_LOOP: for (Entry<File, FileInfo> entry : fileInfos.entrySet()) {
+ FileInfo info = entry.getValue();
+ info.update();
+ if (!info.isCached()) {
+ LOGGER.info(entry.getKey() + " is not cached yet");
+ check = false;
+ break INFO_LOOP;
}
}
- if (cacheCheck) {
- allCached = true;
+ if (check) {
+ cached = true;
break;
}
+
elapsed = System.currentTimeMillis();
- if (elapsed > maxWaitMillis) {
+ LOGGER.info("elapsed time: " + elapsed + " ms");
+ if (elapsed > maxWaitTime) {
break;
}
Object lock = new Object();
synchronized(lock) {
try {
- lock.wait(5000); // Wait 5 seconds before re-polling the files.
+ LOGGER.info("waiting " + POLL_WAIT_TIME + " ms before checking again ...");
+ lock.wait(POLL_WAIT_TIME);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
- return allCached;
+ LOGGER.info("files were cached: " + cached);
+ return cached;
}
private static String readFully(InputStream inputStream, String encoding) throws IOException {
|