001    /**
002     * 
003     */
004    package org.wdssii.polarmerger;
005    
006    import java.util.ArrayList;
007    import java.util.Date;
008    import java.util.List;
009    
010    import org.apache.commons.logging.Log;
011    import org.apache.commons.logging.LogFactory;
012    import org.wdssii.core.Algorithm;
013    import org.wdssii.core.BuilderFactory;
014    import org.wdssii.core.DataEncoder;
015    import org.wdssii.core.DataType;
016    import org.wdssii.core.IndexRecord;
017    import org.wdssii.core.Location;
018    import org.wdssii.core.PolarGrid;
019    import org.wdssii.core.Radial;
020    import org.wdssii.core.RadialSet;
021    
022    /**
023     * @author lakshman
024     * 
025     */
026    public class PolarMerger extends Algorithm {
027            private static Log log = LogFactory.getLog(PolarMerger.class);
028    
029            public static void main(String[] args) {
030                    final PolarMerger alg = new PolarMerger();
031                    alg.setupAndExecute(args);
032            }
033    
034            private String inputType = "Reflectivity";
035    
036            private boolean compositeEnabled = true;
037    
038            private int writeIntervalSeconds = 60;
039    
040            private boolean writeAfterEveryInputReceived = false;
041            
042            private float maxElevation = 20.0f;
043    
044            private float azimuthalSpacingDegrees = 0.5f;
045    
046            private float gateWidthKms = 0.5f;
047    
048            //private int numAz = Math.round(360.0f / azimuthalSpacingDegrees);
049    
050            private int numGates = 500;
051    
052            private float ageOffInMinutes = 10;
053    
054            private int numElev = Math.round(maxElevation / azimuthalSpacingDegrees);
055    
056            public enum MergerStrategy { DistanceWeighted, NearestNeighbor, Latest };
057            
058            private MergerStrategy mergerStrategy = MergerStrategy.DistanceWeighted;
059            
060            public float getAzimuthalSpacingDegrees() {
061                    return azimuthalSpacingDegrees;
062            }
063    
064            public void setAzimuthalSpacingDegrees(float azimuthalSpacingDegrees) {
065                    this.azimuthalSpacingDegrees = azimuthalSpacingDegrees;
066                    reset();
067            }
068    
069            public boolean isCompositeEnabled() {
070                    return compositeEnabled;
071            }
072    
073            public void setCompositeEnabled(boolean compositeEnabled) {
074                    this.compositeEnabled = compositeEnabled;
075            }
076    
077            public float getMaxElevation() {
078                    return maxElevation;
079            }
080    
081            public void setMaxElevation(float elevationDegrees) {
082                    this.maxElevation = elevationDegrees;
083                    reset();
084            }
085    
086            private void reset() {
087                    this.observations = null;
088                    this.lookup = null;
089                    //this.numAz = Math.round(360.0f / azimuthalSpacingDegrees);
090                    this.numElev = Math.round(maxElevation / azimuthalSpacingDegrees);
091            }
092    
093            public float getGateWidthKms() {
094                    return gateWidthKms;
095            }
096    
097            public void setGateWidthKms(float gateWidthKms) {
098                    this.gateWidthKms = gateWidthKms;
099            }
100    
101            public String getInputType() {
102                    return inputType;
103            }
104    
105            public void setInputType(String inputType) {
106                    this.inputType = inputType;
107            }
108    
109            public int getNumGates() {
110                    return numGates;
111            }
112    
113            public void setNumGates(int numGates) {
114                    this.numGates = numGates;
115            }
116    
117            public int getWriteIntervalSeconds() {
118                    return writeIntervalSeconds;
119            }
120    
121            public void setWriteIntervalSeconds(int writeIntervalSeconds) {
122                    this.writeIntervalSeconds = writeIntervalSeconds;
123            }
124    
125            public float getAgeOffInMinutes() {
126                    return ageOffInMinutes;
127            }
128    
129            public void setAgeOffInMinutes(float ageOffInMinutes) {
130                    this.ageOffInMinutes = ageOffInMinutes;
131            }
132    
133            public String getOutputType() {
134                    return outputType;
135            }
136    
137            public void setOutputType(String outputType) {
138                    this.outputType = outputType;
139            }
140    
141            /**
142             * One of the MergerStrategy enums
143             * @return mergerStrategy
144             */
145            public String getMergerStrategy() {
146                    return mergerStrategy.toString();
147            }
148    
149            /**
150             * One of the MergerStrategy enums
151             * @param mergerStrategy
152             */
153            public void setMergerStrategy(String mergerStrategy) {
154                    this.mergerStrategy = MergerStrategy.valueOf(mergerStrategy);
155            }
156    
157            public void handleRecord(IndexRecord rec) {
158                    try {
159                            if (log.isDebugEnabled()) {
160                                    log.debug("Received: " + rec);
161                            }
162                            if (!rec.getDataType().equals(inputType)) {
163                                    return;
164                            }
165                            long start = 0;
166                            if (log.isInfoEnabled()) {
167                                    start = System.currentTimeMillis();
168                                    log.info("Starting to process " + rec);
169                            }
170                            RadialSet rs = (RadialSet) BuilderFactory.createDataType(rec);
171                            update(rs);
172                            if (log.isInfoEnabled()) {
173                                    long howlong = System.currentTimeMillis() - start;
174                                    log.info("Took " + howlong / 1000.0 + "s to process " + rec);
175                            }
176                            computeAndWriteIfNeeded();
177                    } catch (Exception e) {
178                            log.error("Error handling " + rec, e);
179                    }
180            }
181    
182            private void computeAndWriteIfNeeded() {
183                    if (lastWrite == null || lastUpdate == null){
184                            return;
185                    }
186                    long timeSinceLastWrite = lastUpdate.getTime() - lastWrite.getTime();
187                    if (writeAfterEveryInputReceived || timeSinceLastWrite > getWriteIntervalSeconds() * 1000) {
188                            if (log.isInfoEnabled()) {
189                                    log.info(timeSinceLastWrite/1000.0
190                                                                    + " seconds since lastWrite: computing and writing outputs");
191                            }
192                            computeAndWriteOutputs();
193                    }
194            }
195    
196            private int numNewRadials = 0;
197    
198            private MergeableObservations observations = null;
199    
200            private PowerDensityLookup lookup = null;
201    
202            private Date lastUpdate = null;
203    
204            private Date lastWrite = null;
205    
206            private Location radarLocation = null;
207    
208            private float nyquist = 0;
209            
210            private String outputType = "";
211    
212            private synchronized void update(RadialSet rs) {
213                    initIfNeeded(rs);
214                    if (lastUpdate == null || rs.getTime().after(lastUpdate)) {
215                            lastUpdate = rs.getTime();
216                            if (lastWrite == null) {
217                                    // first time around
218                                    lastWrite = lastUpdate;
219                            }
220                    }
221                    Radial[] radials = rs.getRadials();
222                    for (int i = 0; i < radials.length; ++i) {
223                            update(radials[i], rs.getElevation(), rs.getRangeToFirstGateKms(), rs.getTime());
224                    }
225                    numNewRadials += radials.length;
226                    if (log.isDebugEnabled()) {
227                            log.debug("Finished updating with " + radials.length);
228                    }
229            }
230    
231            private synchronized void initIfNeeded(RadialSet rs) {
232                    float beamwidth = rs.getBeamWidth();
233                    float gatewidth = rs.getGateWidthKms();
234                    this.radarLocation = rs.getRadarLocation();
235                    this.nyquist = rs.getNyquist();
236                    
237                    // init grid if needed
238                    if (observations == null) {
239                            lookup = new PowerDensityLookup(beamwidth, azimuthalSpacingDegrees,
240                                            gatewidth, gateWidthKms);
241                            observations = new MergeableObservations(mergerStrategy, lookup);
242                    }
243    
244                    if (outputType == null || outputType.length() == 0) {
245                            outputType = rs.getTypeName();
246                    }
247            }
248    
249            private synchronized void update(Radial radial, float elevation, float dist_to_first_gate, Date date) {
250                    int center_az = Math.round(radial.getMidAzimuth()
251                                    / lookup.getAzimuthalSpacingDegrees());
252                    int center_e = Math.round(elevation
253                                    / lookup.getAzimuthalSpacingDegrees());
254            
255                    float[] radial_values = radial.getValues();
256                    
257                    for (int r = 0; r < radial_values.length; ++r){
258                            float value = radial_values[r];
259                            if (value != DataType.MissingData){
260                                    float range = radial.getGateWidthKms()*r + dist_to_first_gate;
261                                    int center_rn = Math.round(range/lookup.getGateSpacingKms());
262                                    Observation obs = new Observation(center_e, center_az, center_rn, value, date);
263                                    observations.add(obs);
264                            }
265                    }
266                    
267            }
268    
269            private final float distToFirst = 0;
270            protected synchronized void computeAndWriteOutputs() {
271                    if (numNewRadials > 0) {
272                            if (log.isDebugEnabled()) {
273                                    log.debug("Writing new product with " + numNewRadials
274                                                    + " updated/new radials");
275                            }
276                            // Prune
277                            Date pruneTime = new Date(lastUpdate.getTime() - (int)(ageOffInMinutes
278                                            * 60 * 1000));
279                            int numPruned = observations.pruneBefore(pruneTime);
280                            if (log.isInfoEnabled()) {
281                                    log.info("Pruned " + numPruned + " observations before "
282                                                    + pruneTime);
283                            }
284                            
285                            // Compute output polar grids
286                            PolarGrid[] mergedGrids = new PolarGrid[numElev];
287                            for (int e = 0; e < numElev; ++e) {
288                                    float elevation = e * azimuthalSpacingDegrees;
289                                    // Create polar grid to hold the merged value. Initialize at zero
290                                    PolarGrid pg = new PolarGrid(radarLocation, lastUpdate, outputType,
291                                                    azimuthalSpacingDegrees, gateWidthKms, elevation,
292                                                    lookup.getBeamwidth(), nyquist, distToFirst, numGates, 0f);
293                                    mergedGrids[e] = pg;
294                            }
295                            observations.fillMergedValues(mergedGrids);
296    
297                            // Compute algorithms
298                            PolarGrid[] algs = performAlgorithms(mergedGrids);
299    
300                            PolarGrid[] grids = new PolarGrid[mergedGrids.length + algs.length];
301                            for (int i=0; i < mergedGrids.length; ++i){
302                                    grids[i] = mergedGrids[i];
303                            }
304                            for (int i=0; i < algs.length; ++i){
305                                    grids[mergedGrids.length+i] = algs[i];
306                            }
307                            
308    
309                            // Write
310                            for (PolarGrid pg : grids) {
311                                    String subtype = getSubTypeForElevation(pg.getElevation());
312                                    DataEncoder.writeDataAndNotify(pg, getOutputDir(),new String[] { subtype });
313                            }
314    
315                            numNewRadials = 0;
316                            lastWrite = lastUpdate;
317                    } else {
318                            log.debug("Nothing to write");
319                    }
320            }
321    
322            private PolarGrid[] performAlgorithms(PolarGrid[] grids) {
323                    List<PolarGrid> result = new ArrayList<PolarGrid>();
324                    if (isCompositeEnabled()) {
325                            PolarGrid example = grids[0];
326                            PolarGrid composite = new PolarGrid(example, example.getTypeName() + "Composite");
327                            float[][] comp_values = composite.getValues();
328                            for (int g = 0; g < grids.length; ++g) {
329                                    float[][] values = grids[g].getValues();
330                                    for (int i = 0; i < values.length; ++i) {
331                                            for (int j = 0; j < values[0].length; ++j) {
332                                                    comp_values[i][j] = Math.max(comp_values[i][j],
333                                                                    values[i][j]);
334                                            }
335                                    }
336                            }
337                            result.add(composite);
338                    }
339                    return result.toArray(new PolarGrid[0]);
340            }
341    
342            public boolean isWriteAfterEveryInputReceived() {
343                    return writeAfterEveryInputReceived;
344            }
345    
346            /** If set to true, then writeIntervalSeconds is ignored. */
347            public void setWriteAfterEveryInputReceived(boolean writeAfterEveryInputReceived) {
348                    this.writeAfterEveryInputReceived = writeAfterEveryInputReceived;
349            }
350    
351    }