001 /** 002 * 003 */ 004 package org.wdssii.ltgingest; 005 006 import java.io.BufferedReader; 007 import java.io.File; 008 import java.io.FileWriter; 009 import java.io.InputStreamReader; 010 import java.io.PrintWriter; 011 import java.net.Socket; 012 import java.text.ParseException; 013 import java.text.SimpleDateFormat; 014 import java.util.ArrayList; 015 import java.util.Date; 016 import java.util.List; 017 import java.util.Timer; 018 import java.util.TimerTask; 019 020 import org.apache.commons.logging.Log; 021 import org.apache.commons.logging.LogFactory; 022 import org.wdssii.core.DataEncoder; 023 import org.wdssii.core.WDSSIIProgram; 024 025 /** 026 * 027 * Accesses ASCII text data available via telnet to a port on local machine, 028 * inserts the data into a LDM queue for rebroadcast. 029 * 030 * @author lakshman 031 * 032 */ 033 public class TelnetToLdm extends WDSSIIProgram { 034 private static Log log = LogFactory.getLog(TelnetToLdm.class); 035 036 private String machine = "localhost"; 037 private int port = 12345; 038 private int everyNseconds = 30; 039 private String ldmInsertCommand = "w2pqinsert.sh"; 040 private String dateFormatInLines = "MM/dd/yy, HH:mm:ss"; 041 private String outputDir = "/tmp/output"; 042 043 public String getMachine() { 044 return machine; 045 } 046 047 public void setMachine(String machine) { 048 this.machine = machine; 049 } 050 051 public int getPort() { 052 return port; 053 } 054 055 public void setPort(int port) { 056 this.port = port; 057 } 058 059 public int getEveryNseconds() { 060 return everyNseconds; 061 } 062 063 public void setEveryNseconds(int everyNseconds) { 064 this.everyNseconds = everyNseconds; 065 } 066 067 public String getLdmInsertCommand() { 068 return ldmInsertCommand; 069 } 070 071 public void setLdmInsertCommand(String ldmInsertCommand) { 072 this.ldmInsertCommand = ldmInsertCommand; 073 } 074 075 public String getDateFormatInLines() { 076 return dateFormatInLines; 077 } 078 079 public void setDateFormatInLines(String dateFormatInLines) { 080 this.dateFormatInLines = dateFormatInLines; 081 } 082 083 public String getOutputDir() { 084 return outputDir; 085 } 086 087 public void setOutputDir(String outputDir) { 088 this.outputDir = outputDir; 089 } 090 091 @Override 092 public void execute() { 093 try { 094 new File(outputDir).mkdirs(); 095 096 // create Timer to write data. This runs in separate thread 097 new Timer().schedule(new TimerTask(){ 098 public void run(){ 099 writeData(); 100 } 101 }, everyNseconds * 1000, everyNseconds * 1000); // task, delay, period 102 103 // connect and wait on port using main thread 104 Socket s = new Socket(machine, port); 105 BufferedReader reader = new BufferedReader(new InputStreamReader(s 106 .getInputStream())); 107 String line; 108 while ((line = reader.readLine()) != null) { 109 updateData(line); 110 } 111 } catch (Exception e) { 112 log.error("Failed:", e); 113 } 114 } 115 116 private List<String> lines = new ArrayList<String>(); 117 private Date lastWrittenTime = null; 118 119 private synchronized void updateData(String line) { 120 lines.add(line); 121 } 122 123 private synchronized void writeData() { 124 Date outputFileTime = null; 125 PrintWriter writer = null; 126 try{ 127 // get time of output file 128 if (lines.size() > 0){ 129 // time stamp in first line 130 String firstLine = lines.get(0); 131 SimpleDateFormat sdf = new SimpleDateFormat(dateFormatInLines); 132 try{ 133 outputFileTime = sdf.parse(firstLine.substring(0,dateFormatInLines.length())); 134 } catch (ParseException e){ 135 log.warn("Corrupt data received:", e); 136 } 137 } 138 if (outputFileTime == null){ 139 if (lastWrittenTime != null){ 140 outputFileTime = new Date( lastWrittenTime.getTime() + everyNseconds * 1000 ); 141 } else { 142 outputFileTime = new Date(); // current time 143 } 144 } 145 146 // write output file 147 File f = new File(outputDir + "/ltg_" + DataEncoder.getFormattedDate(outputFileTime) + ".txt"); 148 writer = new PrintWriter(new FileWriter(f)); 149 for (String line : lines){ 150 writer.println(line); 151 } 152 writer.close(); 153 writer = null; 154 log.info("Wrote " + lines.size() + " lines to " + f); 155 156 // clear the data for next accumulation round 157 lines.clear(); 158 159 // insert into LDM 160 Runtime.getRuntime().exec(new String[]{ ldmInsertCommand, f.getAbsolutePath()}); 161 162 } catch (Exception e){ 163 log.warn("Unable to write data", e); 164 } finally { 165 lastWrittenTime = outputFileTime; 166 if (writer != null){ 167 writer.close(); 168 } 169 } 170 } 171 172 public static void main(String[] args) { 173 WDSSIIProgram program = new TelnetToLdm(); 174 program.setupAndExecute(args); 175 } 176 177 }