JavaDoc


001    /**
002     * Copyright (c) 2003 Daffodil Software Ltd all rights reserved,
003     * Modifications Copyright (c) 2008 Regiscope Digital Imaging Co, LLC, All rights reserved.
004     * This program is free software; you can redistribute it and/or modify
005     * it under the terms of version 2 of the GNU General Public License as
006     * published by the Free Software Foundation.
007     * There are special exceptions to the terms and conditions of the GPL
008     * as it is applied to this software. See the GNU General Public License for more details.
009     *
010     * This program is distributed in the hope that it will be useful,
011     * but WITHOUT ANY WARRANTY; without even the implied warranty of
012     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
013     * GNU General Public License for more details.
014     *
015     * You should have received a copy of the GNU General Public License
016     * along with this program; if not, write to the Free Software
017     * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
018     */
019    
020    package org.dbreplicator.replication;
021    
022    import java.net.*;
023    import java.net.UnknownHostException;
024    import java.rmi.*;
025    import java.rmi.registry.*;
026    import java.rmi.server.*;
027    import java.sql.*;
028    import java.util.*;
029    import org.dbreplicator.replication.zip.ImportedTablesInfo;
030    import org.dbreplicator.graph.DirectedGraph;
031    import org.dbreplicator.replication.DBHandler.AbstractDataBaseHandler;
032    import org.dbreplicator.replication.schedule.ScheduleHandler;
033    import org.apache.log4j.Logger;
034    import java.io.File;
035    import org.apache.log4j.PropertyConfigurator;
036    import java.io.Writer;
037    import java.io.BufferedWriter;
038    import java.io.FileWriter;
039    import java.io.InputStreamReader;
040    import java.io.BufferedReader;
041    import java.io.IOException;
042    import org.dbreplicator.graph.Vertex;
043    import javax.sql.*;
044    
045    /**
046     * This is the main class of DBRepliactor, any side whether publisher or
047     * subscriber has to get an instance of this class by the method getInstance and
048     * then connect to the required database by using the method setDataSource.
049     * <p>
050     * The Publisher can create a publication by executing the method:
051     * <p>
052     * createPublication (get an instance of Publication class),
053     * <p>
054     *  and the Subscriber can create a subscription by executing the method:
055     * <p>
056     * createSubscription (get an instance of Subscription class).
057     * <p>
058     * After that the control is transferred to the publisher and subscriber.
059     * and they start communicating to each other.  It also implements _ReplicationServerImpl interface. So it gives publication's
060     * object when needed remotely(At the time of subscribing, snapshot, synchronize).
061     *
062     */
063    
064    public class ReplicationServer
065        extends UnicastRemoteObject implements _ReplicationServer, _ReplicationServerImpl {
066    
067      /**
068       * Replication server name and url
069       */
070      private String name, url;
071    
072      /**
073       * Port on which Replication server is running
074       */
075      private int port;
076    
077      /**
078       * Mapping of connections
079       */
080      private ConnectionPool connectionPool;
081    
082      /**
083       * Mapping of pulications
084       */
085      private TreeMap pubMap = new TreeMap(String.CASE_INSENSITIVE_ORDER);
086    
087      /**
088       * Mapping of subscriptions
089       */
090    
091      public ScheduleHandler scheduleHandler;
092    
093      private TreeMap subMap = new TreeMap(String.CASE_INSENSITIVE_ORDER);
094      private Connection defaultConnection;
095      public String driver = "",URL = "",user,
096                               password,
097                               portNo,
098                               databaseName,
099                               databaseServerName,
100                               dataBaseName,
101                               dBPortNo,
102                               vendorName;
103    
104    
105      ArrayList tablesInCycle=null;
106    
107      protected static Logger log = Logger.getLogger(ReplicationServer.class.getName());
108    
109      private ReplicationServer(int port0, String url0) throws RemoteException {
110        port = port0;
111        url = url0;
112      }
113    
114      /**
115       * This method is responsible for getting the Replication Server instance.
116       * It starts a replication server at the specified port and m/c by starting
117       * the rmi registry over the specified port and m/c and binding this object.
118       *
119       * @param     port0          Port no where we want to start the replication server.
120       * @param     url0           System's name or ipaddress where to start replication server.
121       * @return    rep            instance of replication server
122       * @throws    RepException   if port is busy or m/c name is passed as localhost
123       */
124      public static ReplicationServer getInstance(int port0, String url0) throws RepException {
125       //initialize and load log4j.properties file
126        initializeLog4j();
127        if (url0.equalsIgnoreCase("localhost")) {
128          log.error("Url cannot be localhost");
129          throw new RepException("REP004", null);
130        }
131        ReplicationServer rep;
132        try {
133          String ipadd = null;
134          try {
135            ipadd = InetAddress.getByName(url0).getHostAddress();
136                    /* bjt - listen on all addresses of this machine */
137            //ipadd = "0.0.0.0";
138          }
139            catch (UnknownHostException ex)
140              {
141    //ex.printStackTrace();
142            log.error(ex.getMessage(), ex);
143            throw new RepException("REP004", new Object[]{ex.getMessage()});
144          } 
145          rep = new ReplicationServer(port0, url0);
146          Registry r = LocateRegistry.createRegistry(port0);
147    
148          //String url = "rmi://"+ url0 +":"+ Integer.toString(port0)+"/"+url0+"_"+port0;
149          //String url = "rmi://" + ipadd + ":" + Integer.toString(port0) + "/" +ipadd + "_" + port0;
150          String url = "rmi://0.0.0.0:" + Integer.toString(port0) + "/" + url0 + "_" + port0;
151          Naming.rebind(url, rep);
152          log.info("URL " + url + " PORT " + port0);
153        }
154        catch (RemoteException ex) {
155          RepConstants.writeERROR_FILE(ex);
156          throw new RepException("REP001", new Object[] {ex.getMessage()});
157        }
158        catch (MalformedURLException ex) {
159          RepConstants.writeERROR_FILE(ex);
160          throw new RepException("REP004", new Object[] {ex.getMessage()});
161        }
162        log.debug("ReplicationServer started");
163        return (ReplicationServer) rep;
164      }
165    
166      /**
167       * This method is called to get the publication's object even after
168       * creating it. It is well implemented in the method getPriPublication.
169       *
170       * @param    pubName         Name of the publication whose object is required.
171       * @return   pub             Publication class' object.
172       * @throws RemoteException
173       * @throws RepException
174       */
175    
176      public _Publication getPublication(String pubName) throws RemoteException, RepException {
177        return getPriPublication(pubName);
178      }
179    
180      /**
181       * Returns the publication object when called by the subscriber remotely.
182       *
183       * @param pubName
184       * @return Publication Object
185       * @throws RemoteException
186       * @throws RepException
187       */
188    
189      public _PubImpl getRemotePublication(String pubName) throws RemoteException, RepException {
190        return getPriPublication(pubName);
191      }
192    
193      /**
194       * This method returns the publication object. It is implementd for the
195       * cases in which for some reason the publication's object can not
196       * be gotten directly, as when the replication server is restarted. This
197       * method searches publications and reptable tables and gathers the required
198       * information to rebuild the publication's object.
199       *
200       * @param    pubName          Name of the publication
201       * @return   pub              Publication's object.
202       * @throws   RemoteException
203       * @throws   RepException
204       */
205    
206      private Publication getPriPublication(String pubName) throws RemoteException, RepException {
207        Publication pub = (Publication) pubMap.get(pubName);
208        //pub will be null if the remote replication server has been restarted
209        if (pub != null) {
210          return pub;
211        }
212        //connectionPool will be null if could not get connection from datasource
213        if (connectionPool == null) {
214          throw new RepException("REP002", null);
215        }
216        //Here it comes if replication server has been restarted
217        Connection con = connectionPool.getConnection(pubName);
218        MetaDataInfo mdis = Utility.getDatabaseMataData(con);
219        ResultSet rs = null;
220        PreparedStatement prStt1 = null;
221        try {
222          //searches in publication table
223          prStt1 = con.prepareStatement(RepConstants.loadPublicationQuery);
224          prStt1.setString(1, pubName);
225          rs = prStt1.executeQuery();
226          boolean f1 = rs.next();
227    //      if (!rs.next()) {
228            if (!f1) {
229            return null;
230          }
231          String conflictReolver = rs.getString(RepConstants.publication_conflictResolver2);
232          String serverName = rs.getString(RepConstants.publication_serverName3);
233          MetaDataInfo mdi = Utility.getDatabaseMataData(con);
234          AbstractDataBaseHandler dataBaseHandler = Utility.getDatabaseHandler(connectionPool, pubName);
235          //searches in reptable
236          ArrayList tableNames = new ArrayList();
237                prStt1 = con.prepareStatement(RepConstants.loadRepTableQuery);
238                prStt1.setString(1, pubName);
239                rs = prStt1.executeQuery();
240          while (rs.next()) {
241            String tableName = rs.getString(RepConstants.repTable_tableName2);
242            String filterClause = rs.getString(RepConstants.repTable_filter_clause3);
243            SchemaQualifiedName sname = new SchemaQualifiedName(mdis, tableName);
244            RepTable repTable = new RepTable(sname, filterClause,RepConstants.publisher);
245            repTable.setServerType(RepConstants.publisher);
246            String createShadowTable = rs.getString(RepConstants.repTable_createshadowtable6);
247            String cyclicDependency = rs.getString(RepConstants.repTable_cyclicdependency7);
248            String rep_CR = rs.getString(RepConstants.repTable_conflict_resolver4);
249            repTable.setConflictResolver(rep_CR == null ? conflictReolver : rep_CR);
250            repTable.setCreateShadowTable(createShadowTable);
251            repTable.setCyclicDependency(cyclicDependency);
252            mdi.setPrimaryColumns(repTable, sname.getSchemaName(),sname.getTableName());
253            mdi.setForeignKeyColumns(repTable, sname.getSchemaName(),sname.getTableName());
254            mdi.setAllColumns(repTable, sname.getSchemaName(),sname.getTableName());
255            dataBaseHandler.setIgnoredColumns(con, pubName, repTable);
256            tableNames.add(repTable);
257          }
258                /** @todo   rs has been closed  */
259                rs.close();
260          try {
261            name = InetAddress.getLocalHost().getHostName() + "_" + port;
262          }
263          catch (UnknownHostException ex) {
264            ex.printStackTrace();
265            RepConstants.writeERROR_FILE(ex);
266            throw new RepException(ex.getMessage(), null);
267          }
268          pub = new Publication(connectionPool, pubName, name, this);
269          pub.setConflictResolver(conflictReolver);
270          pub.setPublicationTables(tableNames);
271          pubMap.put(pubName, pub);
272          log.info("Publisher name " + pubName + " conflictResolver " +conflictReolver + " TableNames " + tableNames.toString());
273          return pub;
274        }
275        catch (SQLException ex) {
276          log.debug("Returning Null Publicaiton");
277          return null;
278        }
279        finally {
280              try {
281            if (prStt1 != null)
282                prStt1.close();
283              }
284              catch (SQLException ex1) {
285              }
286                    connectionPool.returnConnection(con);
287        }
288      }
289    
290      /**
291       * It is responsible for getting the default connection with the database.
292       * Default connection implies the connection of the replication server.
293       *
294       * @param   driver0       The driver name of the concerned databse.
295       * @param   url0          The URL of the cocerned database.
296       * @param   user0         The username for the concerned databse.
297       * @param   password0     The password for the concerned databse.
298       * @throws  RepException  In case it can not get connection with
299       *                        the database.
300       */
301    
302      public void setDataSource(String driver0, String url0, String user0,
303                                String password0) throws RepException {
304        try {
305          driver = driver0;
306          URL = url0;
307          connectionPool = new ConnectionPool(url0, driver0, user0, password0);
308          connectionPool.setLocalAddress(url);
309          connectionPool.setLocalPortNo(port);
310          //Gets connection.
311          defaultConnection = connectionPool.getDefaultConnection();
312          AbstractDataBaseHandler adh = Utility.getDatabaseHandler(connectionPool,defaultConnection);
313          connectionPool.returnConnection(defaultConnection);
314          scheduleHandler = new ScheduleHandler(this, adh);
315        }
316        catch (RepException ex) {
317          RepConstants.writeERROR_FILE(ex);
318          throw ex;
319        }
320        catch (Exception ex) {
321          RepConstants.writeERROR_FILE(ex);
322          throw new RepException("REP0", new Object[] {ex.getMessage()});
323        }
324      }
325    
326      public void setDataSource(DataSource dataSource) throws RepException {
327        try {
328          DatabaseMetaData metaData = dataSource.getConnection().getMetaData();
329          driver = metaData.getDriverName();
330          URL = metaData.getURL();
331          connectionPool = new ConnectionPool(dataSource);
332          connectionPool.setLocalAddress(url);
333          connectionPool.setLocalPortNo(port);
334          //Gets connection.
335          defaultConnection = connectionPool.getDefaultConnection();
336          AbstractDataBaseHandler adh = Utility.getDatabaseHandler(connectionPool,defaultConnection);
337          connectionPool.returnConnection(defaultConnection);
338          scheduleHandler = new ScheduleHandler(this, adh);
339        }
340        catch (RepException ex) {
341          RepConstants.writeERROR_FILE(ex);
342          throw ex;
343        }
344        catch (Exception ex) {
345          RepConstants.writeERROR_FILE(ex);
346          throw new RepException("REP0", new Object[] {ex.getMessage()});
347        }
348      }
349    
350      public void setDataSource(DataSource dataSource, String username, String password) throws RepException {
351          try {
352            DatabaseMetaData metaData = dataSource.getConnection().getMetaData();
353            driver = metaData.getDriverName();
354            URL = metaData.getURL();
355            connectionPool = new ConnectionPool(dataSource, username, password);
356            connectionPool.setLocalAddress(url);
357            connectionPool.setLocalPortNo(port);
358            //Gets connection.
359            defaultConnection = connectionPool.getDefaultConnection();
360            AbstractDataBaseHandler adh = Utility.getDatabaseHandler(connectionPool,defaultConnection);
361            connectionPool.returnConnection(defaultConnection);
362          scheduleHandler = new ScheduleHandler(this, adh);
363        }
364        catch (RepException ex) {
365          RepConstants.writeERROR_FILE(ex);
366          throw ex;
367        }
368        catch (Exception ex) {
369          RepConstants.writeERROR_FILE(ex);
370          throw new RepException("REP0", new Object[] {ex.getMessage()});
371        }
372      }
373    
374    
375      public void setDataSource(String  dataBaseName0,String user0,String password0, String dBPortNo0, String databaseServerName0, String vendorName0) throws RepException {
376        dataBaseName =dataBaseName0;
377        user =user0;
378        password =password0;
379        dBPortNo =dBPortNo0;
380        databaseServerName =databaseServerName0;
381        vendorName =vendorName0;
382       try {
383         DBDataSource dbs = new DBDataSource(dataBaseName0, user0, password0,databaseServerName0, dBPortNo0,vendorName0);
384         DataSource datasource = dbs.getDataSource();
385         defaultConnection = datasource.getConnection(user0,password0);
386         connectionPool = new ConnectionPool(datasource, user0, password0);
387    //    defaultConnection = connectionPool.getDefaultConnection();
388         AbstractDataBaseHandler adh = Utility.getDatabaseHandler(connectionPool,defaultConnection);
389         scheduleHandler = new ScheduleHandler(this, adh);
390       }
391       catch (SQLException ex) {
392         RepConstants.writeERROR_FILE(ex);
393          throw new RepException("REP0", new Object[] {ex.getMessage()});
394       }
395       catch (RepException ex) {
396        throw ex;
397       }
398      }
399    
400      /**
401       * Checks the following :
402       * <p>
403       * Publication alredy exists.
404       * <p>
405       * Specified Table(s) exists in database.
406       * <p>
407       * Each table must have primary key.
408       * <p>
409       * Sequence of tables specified ( Primary Table than Foreign Table )
410       * <p>
411       * Then, Create the instance of Publication, set the instance of RepTable [] to pub
412       * <p>
413       * Put it in Map
414       *
415       * @param   pubName        Name of the Publication
416       * @param   tableNames     Names of the tables to be published.
417       * @return  pub1           Publication class' object.
418       * @throws  RepException   In case of violation of above checked conditions.
419       */
420    
421      public _Publication createPublication(String pubName, String[] tableNames) throws RepException {
422        log.info("Create Publication " + pubName);
423        return createPublication(pubName,tableNames,null);
424          }
425    
426      /**
427       * Check the Dublicacy of tables in  given list
428       * If dublicate table name occure then it through
429       * the RepException because it further create the
430       * problemes in Replication process.
431       * @param pubTables ArrayList
432       * @param sname SchemaQualifiedName
433       * @param pubName String
434       * @throws RepException
435       */
436      private void checkTablesDuplicyInPublication(ArrayList pubTables, SchemaQualifiedName sname, String pubName) throws RepException {
437        if (pubTables.size() > 0) {
438          for (int j = 0; j < pubTables.size(); j++) {
439            RepTable repTable = (RepTable) pubTables.get(j);
440            String sqname = repTable.getSchemaQualifiedName().toString();
441            log.debug("Checking Table " + sqname +" if already existing in database");
442            if (sqname.equals(sname.toString())) {
443                throw new RepException("REP013", new Object[] {sname.toString(),pubName});
444            }
445          }
446        }
447      }
448    
449      public _Publication createPublicationBK(String pubName, String[] tableNames) throws RepException {
450        //If database is not binded
451        if (connectionPool == null) {
452          throw new RepException("REP002", null);
453        }
454        //Checks in publication table in database
455    
456        if (checkPublicationExistance(pubName)) {
457          //if(pubMap.containsKey(pubName))
458          throw new RepException("REP014", new Object[] {pubName});
459        }
460        //Checks the properness of given table names
461        if (tableNames == null || tableNames.length == 0) {
462          throw new RepException("REP012", new Object[] {pubName});
463        }
464    
465        TreeMap repTableMap = new TreeMap(String.CASE_INSENSITIVE_ORDER);
466        ArrayList pubTableList = new ArrayList();
467            Connection connection = null;
468    
469        try {
470    
471              connection = connectionPool.getConnection(pubName);
472              MetaDataInfo mdi = Utility.getDatabaseMataData(connection);
473          for (int i = 0; i < tableNames.length; i++) {
474            //Converts Table names  to schema qualified name.
475            SchemaQualifiedName sname = new SchemaQualifiedName(mdi, tableNames[i]);
476            if (repTableMap.containsKey(sname.toString())) {
477              throw new RepException("REP013", new Object[] {sname.toString(),pubName});
478            }
479            String schema = sname.getSchemaName();
480            String table = sname.getTableName();
481            //checks whether the table exists in the database or not.
482            mdi.checkTableExistance(sname);
483            schema = sname.getSchemaName();
484            RepTable repTable = new RepTable(sname, RepConstants.publisher);
485            //repTable.setConflictResolver(RepConstants.publisher_wins);
486            //Sets Sorted primary key columns in repTable.
487            mdi.setPrimaryColumns(repTable, schema, table);
488            mdi.setForeignKeyColumns(repTable, schema, table);
489            //Checks for First Primary key then foreign key tables else exception.
490            mdi.checkTableSequenceAccordingForeignKey(schema, table, pubTableList);
491            pubTableList.add(sname.toString().toUpperCase());
492            repTableMap.put(sname.toString(), repTable);
493          }
494          //name = InetAddress.getLocalHost().getHostName()+"_"+pubName+"_"+port;
495          name = InetAddress.getLocalHost().getHostName() + "_" + port;
496          //Creates publication by setting it's DataBaseHandler &  XMLCreator
497          Publication publ = new Publication(connectionPool, pubName, name, this);
498          //Sets publication's repTable elements
499          publ.setPublicationTables(new ArrayList(repTableMap.values()));
500          pubMap.put(pubName, publ);
501          return publ;
502        }
503        catch (RepException ex) {
504          throw ex;
505        }
506        catch (Exception ex) {
507          log.error(ex.getMessage(), ex);
508          throw new RepException("REP031", new Object[] {pubName, ex.getMessage()});
509        }
510       finally {
511            connectionPool.returnConnection(connection);
512       }
513      }
514    
515      /**
516       * This method returns the subscription's object, It is implemented for the
517       * cases in which for some reason the subscription's object can not
518       * be gotten directly, As when the replication server is restarted. This
519       * method searches subscriptions and reptable tables and gathers the required
520       * information to rebuild the subscription's object.
521       *
522       * @param      subName         Subscription's name
523       * @return     sub             Subscription class' object
524       * @throws     RepException    If could not set up connection with the database.
525       */
526    
527      public _Subscription getSubscription(String subName) throws RepException {
528        Subscription sub = (Subscription) subMap.get(subName);
529        if (sub != null) {
530          return sub;
531        }
532        if (connectionPool == null) {
533          throw new RepException("REP002", null);
534        }
535        AbstractDataBaseHandler dataBaseHandler = Utility.getDatabaseHandler(connectionPool, subName);
536        //Here it comes if the replication server has been restarted.
537            Connection con = null;
538        PreparedStatement prStt1 = null;
539        ResultSet rs = null;
540        try {
541              con = connectionPool.getFreshConnection(subName);
542              MetaDataInfo mds = Utility.getDatabaseMataData(con);
543          prStt1 = con.prepareStatement(RepConstants.loadSubscriptionQuery);
544          prStt1.setString(1, subName);
545           rs = prStt1.executeQuery();
546          if (!rs.next()) {
547            return null;
548          }
549    
550          //Searches in Subscription table.
551          String pubName = rs.getString(RepConstants.subscription_pubName2);
552          String conflictReolver = rs.getString(RepConstants.subscription_conflictResolver3);
553          String serverName = rs.getString(RepConstants.subscription_serverName4);
554          ArrayList tableNames = new ArrayList();
555    
556          //Searches in the Rep Table
557                prStt1 = con.prepareStatement(RepConstants.loadRepTableQuery);
558                prStt1.setString(1, subName);
559                rs = prStt1.executeQuery();
560          if (!rs.next()) {
561            return null;
562          }
563    
564          sub = new Subscription(connectionPool, subName, serverName, this);
565          ArrayList repTables = new ArrayList();
566          do {
567            String tableName = rs.getString(RepConstants.repTable_tableName2);
568            String filterClause = rs.getString(RepConstants.repTable_filter_clause3);
569            String createShadowTable = rs.getString(RepConstants.repTable_createshadowtable6);
570            String cyclicDependency = rs.getString(RepConstants.repTable_cyclicdependency7);
571            SchemaQualifiedName sname = new SchemaQualifiedName(mds, tableName);
572            RepTable repTable = new RepTable(sname, filterClause, RepConstants.subscriber);
573            repTable.setConflictResolver(conflictReolver);
574            repTable.setServerType(RepConstants.subscriber);
575            repTable.setCreateShadowTable(createShadowTable);
576            repTable.setCyclicDependency(cyclicDependency);
577            mds.setPrimaryColumns(repTable, sname.getSchemaName(), sname.getTableName());
578            mds.setForeignKeyColumns(repTable, sname.getSchemaName(), sname.getTableName());
579            mds.setAllColumns(repTable, sname.getSchemaName(),sname.getTableName());
580            dataBaseHandler.setIgnoredColumns(con, subName,repTable);
581            repTables.add(repTable);
582          }
583          while (rs.next());
584          sub.setPublicatonName(pubName);
585          sub.setSubscriptionTables(repTables);
586          subMap.put(subName, sub);
587          log.info("Subscriber name " + subName + " Publication Name " + pubName);
588          return sub;
589        }
590        catch (SQLException ex) {
591          RepConstants.writeERROR_FILE(ex);
592          return null;
593        }
594        catch (RemoteException ex) {
595          log.error(ex.getMessage(), ex);
596          throw new RepException(ex.getMessage(), null);
597        }
598        finally {
599              try {
600            if (rs != null) {
601                  rs.close();
602                  prStt1.close();
603                }
604              }
605              catch (SQLException ex1) {
606              }
607                     connectionPool.returnConnection(con);
608            }
609      }
610    
611      /**
612       * Checks whether a subscription with the same name already exists
613       * in Subscription Table. Creates the subscription's object,
614       * then adds an entry in the subscription Map.
615       *
616       * @param   subName           Name of the subscription.
617       * @param   pubName           Name of the corresponding publication.
618       * @return  sub               subscription class' object.
619       * @throws  RepException      If the above check violates.
620       */
621    
622      public _Subscription createSubscription(String subName, String pubName) throws RepException {
623        log.debug("Create subscription " + subName + " with publication " + pubName);
624    
625        //Checks if the subscription with the same name already exists in Subscription Table
626        if (checkSubscriptionExistance(subName)) {
627          throw new RepException("REP023", new Object[] {subName});
628        }
629        try {
630          name = InetAddress.getLocalHost().getHostName() + "_" + port;
631          Subscription sub = new Subscription(connectionPool, subName, name, this);
632          sub.setPublicatonName(pubName);
633          subMap.put(subName, sub);
634          log.info("created subscription " + subName + " with publication " +pubName);
635          return sub;
636        }
637        catch (RepException ex) {
638          log.error(ex.getMessage(), ex);
639          RepConstants.writeERROR_FILE(ex);
640          throw ex;
641        }
642        catch (Exception ex) {
643          log.error(ex.getMessage(), ex);
644          throw new RepException("REP042", new Object[] {subName, ex.getMessage()});
645        }
646      }
647    
648      public Connection getConnection(String pub_sub_Name) throws RepException {
649        return connectionPool.getConnection(pub_sub_Name);
650      }
651    
652      public Connection getDefaultConnection() {
653        return defaultConnection;
654      }
655    
656      /**
657       * Searches into the publications table for the publication name.
658       *
659       * @param                pubName
660       * @return               true if found, else false.
661       * @throws RepException
662       */
663    
664      private boolean checkPublicationExistance(String pubName) throws RepException {
665        AbstractDataBaseHandler dbh = Utility.getDatabaseHandler(connectionPool,pubName);
666        String pubs = " Select " + RepConstants.publication_pubName1 +
667            " from " + dbh.getPublicationTableName() + "  where " +
668            RepConstants.publication_pubName1 + " = '" + pubName + "'";
669        Statement stt = null;
670        try {
671          stt = defaultConnection.createStatement();
672          ResultSet rsPubs = stt.executeQuery(pubs);
673          log.debug("Query checkPublicationExistance " + pubs);
674          return rsPubs.next();
675        }
676        catch (SQLException ex) {
677          // Ignore the Exception
678        }
679        finally {
680          try {
681            if (stt != null)
682            stt.close();
683          }
684          catch (SQLException ex1) {
685            // Ignore the Exception
686          }
687        }
688        return false;
689      }
690    
691      /**
692       * Searches the subscriptions table for the subscription name.
693       *
694       * @param                subName
695       * @return               true if found, else false.
696       * @throws RepException
697       */
698    
699      private boolean checkSubscriptionExistance(String subName) throws RepException {
700        AbstractDataBaseHandler dbh = Utility.getDatabaseHandler(connectionPool,subName);
701        String pubs = " Select " + RepConstants.subscription_subName1
702            + " from " + dbh.getSubscriptionTableName() + " where " +
703            RepConstants.subscription_subName1 + " = '" + subName + "'";
704        Statement stt = null;
705        try {
706          stt = defaultConnection.createStatement();
707          ResultSet rsPubs = stt.executeQuery(pubs);
708          log.debug("Query checkSubscriptionExistence " + pubs);
709          return rsPubs.next();
710        }
711        catch (SQLException ex) {
712          // Ignore the Exception
713        }
714        finally {
715          try {
716            if (stt != null)
717            stt.close();
718          }
719          catch (SQLException ex1) {
720            // Ignore the Exception
721          }
722        }
723        return false;
724      }
725    
726      public void refershPublication(String pubName) {
727        pubMap.remove(pubName);
728      }
729    
730      public void refershSubscription(String subName) {
731        subMap.remove(subName);
732      }
733    
734      public String getServerName() throws RemoteException {
735        return name;
736      }
737    
738      /**
739       * This method returns the scheduleHandler instance which is used during
740       *  add,edit and remove Schedule methods in subscription class.
741       *
742       * @return ScheduleHandler
743       */
744    
745      public ScheduleHandler getScheduleHandler() {
746        return scheduleHandler;
747      }
748    
749      /**
750       * To add more tables to a Publication
751       * <p>
752       * Checks the following :
753       * <p>
754       * Publication  exists or not.
755       * <p>
756       * Specified Table(s) exists in database.
757       * <p>
758       * Each table must have primary key.
759       * <p>
760       * Sequence of tables specified ( Primary Table than Foreign Table )
761       *
762       * @param   pubName        Name of the Publication
763       * @param   tableNames0    Names of the tables to be published.
764       * @param   filterClauses  Filters on the published tables.
765       * @param   pubRepTables   ArrayList of tables already published.
766       * @param   pub            publication object.
767       * @throws  RepException   In case of violation of above checked conditions.
768       */
769    
770      public void addTableToPublication(String pubName, String[] tableNames0,
771                                        String[] filterClauses,
772                                        ArrayList pubRepTables,
773                                        Publication pub) throws RepException {
774        Publication pub1 = pub;
775        //If database is not binded
776        if (connectionPool == null) {
777          throw new RepException("REP002", null);
778        }
779        //Checks in publication table in database
780    
781        if (!checkPublicationExistance(pubName)) {
782          //if(!pubMap.containsKey(pubName))
783          throw new RepException("REP036", new Object[] {pubName});
784        }
785        //Checks the properness of given table names
786        if (tableNames0 == null || tableNames0.length == 0) {
787          throw new RepException("REP012", new Object[] {pubName});
788        }
789        ArrayList pubTableList = new ArrayList();
790        ArrayList repTableList = new ArrayList();
791        ArrayList existingPubTableList = new ArrayList();
792        ArrayList newRepTableList = new ArrayList();
793        String conflictResolver = null;
794            Connection connection = null;
795        try {
796          conflictResolver = pub1.getConflictResolver();
797    
798          //get already existing tables in publisher
799          for (int i = 0; i < pubRepTables.size(); i++) {
800            RepTable repTable = (RepTable) pubRepTables.get(i);
801            existingPubTableList.add(repTable.getSchemaQualifiedName().toString());
802    // System.out.println("existingPubTableList " +repTable.getSchemaQualifiedName().toString());
803          }
804                    connection = connectionPool.getConnection(pubName);
805                    MetaDataInfo mdi = Utility.getDatabaseMataData(connection);
806            String[]  tableNames = mdi.getTablesHierarchy(tableNames0);
807    
808    //      initialising newRepTableList with new tables in publisher
809          for (int j = 0; j < tableNames.length; j++) {
810            if (tableNames[j].equalsIgnoreCase("")) {
811              throw new RepException("REP317", null);
812            }
813            SchemaQualifiedName sname = new SchemaQualifiedName(mdi, tableNames[j]);
814            mdi.checkTableExistance(sname);
815            if (existingPubTableList.contains(sname.toString())) {
816              throw new RepException("REP310", new Object[] {tableNames[j]});
817            }
818            else
819             existingPubTableList.add(sname.toString());
820    // System.out.println("new tables adding to existing " + sname.toString());
821          }
822    
823          String[] allTables = new String[existingPubTableList.size()];
824          existingPubTableList.toArray( (Object[]) allTables);
825          allTables = mdi.getTablesHierarchy(allTables);
826    
827          for (int i = 0; i < allTables.length; i++) {
828            //Converts Table names  to schema qualified name.
829            SchemaQualifiedName sname = new SchemaQualifiedName(mdi, allTables[i]);
830            checkTablesDuplicyInPublication(repTableList, sname, pubName);
831            String schema = sname.getSchemaName();
832            String table = sname.getTableName();
833            //checks whether the table exists in the database or not.
834            mdi.checkTableExistance(sname);
835            //Checks for First Primary key then foreign key tables else exception.
836            mdi.checkTableSequenceAccordingForeignKey(schema, table, pubTableList);
837    
838            RepTable repTable = new RepTable(sname, RepConstants.publisher);
839            //Sets Sorted primary key columns in repTable.
840            mdi.setPrimaryColumns(repTable, schema, table);
841    
842            //we will call mdi.setAllColumns in publication.addTableToPublication
843    
844            repTable.setConflictResolver(conflictResolver);
845            try {
846    //System.out.println("filter clause for " + sname + " = " +pub1.getFilterClause(sname));
847              repTable.setFilterClause(pub1.getFilterClause(sname));
848            }
849            catch (RemoteException ex1) {
850              ex1.getMessage();
851              //ignore the exception in case of new tables
852            }
853            pubTableList.add(sname.toString().toUpperCase());
854            repTableList.add(repTable);
855    //System.out.println("repTableList " + repTableList.get(i));
856          }
857           pub.setPublicationTables(repTableList);
858           //tableNames0-without hierarchy set
859           for (int i = 0; i < tableNames0.length; i++) {
860            if (filterClauses[i] != null)
861              pub.setFilter(tableNames0[i], filterClauses[i]);
862           }
863    }
864        catch (RepException ex) {
865    //      ex.printStackTrace();
866          throw ex;
867        }
868        catch (Exception ex) {
869    //      ex.printStackTrace();
870          throw new RepException("REP031", new Object[] {pubName, ex.getMessage()});
871        }
872       finally {
873            connectionPool.returnConnection(connection);
874       }
875      }
876    
877      /**
878         * To drop tables from a Publication
879             * <p>
880         * Checks the following :
881             * <p>
882         * Publication  exists or not.
883             * <p>
884         * Specified Table(s) exists in database.
885             * <p>
886         * Each table must have primary key.
887             * <p>
888         * Sequence of tables specified ( Primary Table than Foreign Table )
889             *
890         * @param   pubName        Name of the Publication
891         * @param   tableNames     Names of the tables to be published.
892         * @return  pub1           Publication class' object.
893         * @throws  RepException   In case of violation of above checked conditions.
894         */
895    
896      public ArrayList dropTableFromPublication(String pubName, String[] tableNames,
897                                                Publication pub, ArrayList pubRepTables) throws RepException {
898          //If database is not binded
899          if (connectionPool == null) {
900            throw new RepException("REP002", null);
901          }
902          //Checks in publication table in database
903    
904          if (!checkPublicationExistance(pubName)) {
905            //if(!pubMap.containsKey(pubName))
906            throw new RepException("REP036", new Object[] {pubName});
907          }
908          //Checks the properness of given table names
909          if (tableNames == null || tableNames.length == 0) {
910            throw new RepException("REP012", new Object[] {pubName});
911          }
912          ArrayList newPubRepTableList = new ArrayList();
913              Connection connection = null;
914    
915          try {
916                    connection = connectionPool.getConnection(pubName);
917                    MetaDataInfo mdi = Utility.getDatabaseMataData(connection);
918            mdi.checkChildTableIncludedInDropTableList(pubRepTables,tableNames);
919            tableNames = mdi.getTablesHierarchy(tableNames);
920          int noOfAllowedDropTables = 0;
921    //      initialising newRepTableList with tables to be dropped from publisher
922            for (int j = 0; j < tableNames.length; j++) {
923              SchemaQualifiedName sname = new SchemaQualifiedName(mdi, tableNames[j]);
924              String schema = sname.getSchemaName();
925              String table = sname.getTableName();
926              mdi.checkTableExistance(sname);
927              RepTable repTable = new RepTable(sname, RepConstants.publisher);
928            noOfAllowedDropTables = checkTableExistanceInPublication(noOfAllowedDropTables, pubRepTables,repTable.getSchemaQualifiedName(), pubName);
929              repTable.setFilterClause(pub.getFilterClause(sname));
930              repTable.setConflictResolver(pub.getConflictResolver());
931              mdi.setPrimaryColumns(repTable, schema, table);
932              mdi.setAllColumns(repTable, sname.getSchemaName(),sname.getTableName());
933              newPubRepTableList.add(repTable);
934            }
935            return newPubRepTableList;
936          }
937          catch (RepException ex) {
938            throw ex;
939          }
940          catch (Exception ex) {
941            throw new RepException("REP031", new Object[] {pubName, ex.getMessage()});
942          }
943             finally {
944                    connectionPool.returnConnection(connection);
945             }
946        }
947    
948    
949        //check table to be dropped if published or not
950        // check if table to be dropped is the only table published ,if yes then throw Exception
951        //check if user is trying to drop all the tables,if yes throw Exception
952        //if not published throw exception
953    
954        private int checkTableExistanceInPublication(int noOfAllowedDropTables,
955        ArrayList pubTables,SchemaQualifiedName sname,String pubName) throws RepException {
956          if (pubTables.size() > 0) {
957          boolean flag = false;
958            for (int j = 0; j < pubTables.size(); j++) {
959              RepTable repTable = (RepTable) pubTables.get(j);
960              String sqname = repTable.getSchemaQualifiedName().toString();
961              if (sqname.equals(sname.toString())) {
962              flag = true;
963                noOfAllowedDropTables++;
964              }
965              // check if table to be dropped is the only table published ,if yes then throw Exception
966            if (pubTables.size() == 1 && flag == true) {
967              throw new RepException("REP314", new Object[] {sname.toString(),pubName});
968              }
969              //check if user is trying to drop all the tables,if yes throw Exception
970            if (pubTables.size() == noOfAllowedDropTables) {
971               throw new RepException("REP315", new Object[] {pubName});
972             }
973            }
974            //check table to be dropped if published or not
975          if (!flag) {
976            throw new RepException("REP313", new Object[] {sname.toString(),pubName});
977            }
978          }
979          return noOfAllowedDropTables;
980      }
981    
982      private static void initializeLog4j() {
983        try {
984        File f1 = new File("." + File.separator + "log4j.properties");
985      if (!f1.exists()) {
986            f1.createNewFile();
987            Writer output = null;
988            if (f1.canWrite()) {
989              output = new BufferedWriter(new FileWriter(f1));
990              output.write("log4j.rootCategory=Off");
991              output.write("\n");
992              output.write("# log4j.rootCategory=DEBUG,A1");
993              output.write("\n");
994              output.write("log4j.appender.A1=org.apache.log4j.ConsoleAppender");
995              output.write("\n");
996              output.write("log4j.appender.A1.layout=org.apache.log4j.PatternLayout");
997              output.write("\n");
998              output.write("log4j.appender.A1.layout.ConversionPattern=%d{ABSOLUTE} %p [%c{1}] [%M %L] %m%n");
999            }
1000             if (output != null)
1001            output.close();
1002          }
1003          PropertyConfigurator.configureAndWatch("." + File.separator +"log4j.properties");
1004        }
1005        catch (Exception ex) {
1006        RepConstants.writeERROR_FILE(ex);
1007      }
1008      }
1009    
1010    
1011      /**
1012       * createPublication
1013       *
1014       * @param pubName String
1015       * @param tableNames String[]
1016       * @param removeCycleTableNames String[]
1017       * @return _Publication
1018       */
1019      public _Publication createPublication(String pubName, String[] tableNames,
1020                                            String[] removeCycleTableNames) throws RepException {
1021        //If database is not binded
1022          if (connectionPool == null) {
1023            throw new RepException("REP002", null);
1024          }
1025    
1026          //Checks in publication table in database
1027          if (checkPublicationExistance(pubName)) {
1028            //if(pubMap.containsKey(pubName))
1029            throw new RepException("REP014", new Object[] {pubName});
1030          }
1031    
1032          //Checks the properness of given table names
1033          if (tableNames == null || tableNames.length == 0) {
1034            throw new RepException("REP012", new Object[] {pubName});
1035          }
1036    
1037          //repTableList is a list of all reptable that are created
1038          //corresponding to which is included in publication.
1039          ArrayList repTableList = new ArrayList();
1040              Connection connection = null;
1041          try {
1042            // pubTableList contain all tables that are included in publication
1043            ArrayList pubTableList = new ArrayList();
1044                    connection = connectionPool.getConnection(pubName);
1045                    MetaDataInfo mdi = Utility.getDatabaseMataData(connection);
1046            SchemaQualifiedName[] schemaQualifiedNames = new SchemaQualifiedName[tableNames.length];
1047            DirectedGraph directedGraph = new DirectedGraph(tableNames.length);
1048    
1049            for (int i = 0; i < tableNames.length; i++) {
1050              //Converts Table names  to schema qualified name.
1051                      //log.debug("tablenames as passed tablename #" + i + " = " + tableNames[i]);
1052              SchemaQualifiedName sname = new SchemaQualifiedName(mdi, tableNames[i]);
1053                      //log.debug("sname for #" + i + " = " + sname.getTableName());
1054              checkTablesDuplicyInPublication(repTableList, sname, pubName);
1055              String schema = sname.getSchemaName();
1056              String table = sname.getTableName();
1057    
1058              //checks whether the table exists in the database or not.
1059              mdi.checkTableExistance(sname);
1060              schema = sname.getSchemaName();
1061              RepTable repTable = new RepTable(sname, RepConstants.publisher);
1062    
1063              //repTable.setConflictResolver(RepConstants.publisher_wins);
1064              //Sets Sorted primary key columns in repTable.
1065              mdi.setPrimaryColumns(repTable, schema, table);
1066              mdi.setForeignKeyColumns(repTable, schema, table);
1067    
1068              //we are not calling mdi.setAllColumn(),we are doing this while publishing
1069              pubTableList.add(sname.toString());
1070              directedGraph.addVertex(sname);
1071              repTableList.add(repTable);
1072              schemaQualifiedNames[i] = sname;
1073            }
1074            Map importedTableInfoMap = mdi.getImportedTablesInfo(schemaQualifiedNames,directedGraph,removeCycleTableNames);
1075            if (directedGraph.hasCycle()) {
1076              tablesInCycle = directedGraph.TablesInCycle();
1077              throw new RepException("REP0205",new Object[]{tablesInCycle});
1078            }
1079    
1080            List repTablesNamesOrderdAccToHierarcy=Arrays.asList(directedGraph.topologicalSort());
1081    //System.out.println("Ordered Tables :: " +repTablesNamesOrderdAccToHierarcy);
1082            int noOfRepTables = repTablesNamesOrderdAccToHierarcy.size();
1083            RepTable[] tempRep = new RepTable[noOfRepTables];
1084            for (int i = 0; i < noOfRepTables; i++) {
1085              RepTable repTable = (RepTable) repTableList.get(i);
1086              SchemaQualifiedName unOrderedschemaQualifiedName = repTable.getSchemaQualifiedName();
1087              int orderedIndex = repTablesNamesOrderdAccToHierarcy.indexOf(unOrderedschemaQualifiedName);
1088              tempRep[orderedIndex] = repTable;
1089          }
1090    // System.out.println("repTablesOrderdAccToHierarcy::");
1091            ArrayList repTables = new ArrayList(noOfRepTables);
1092            for (int i = noOfRepTables - 1; i >= 0; i--) {
1093              repTables.add(tempRep[i]);
1094    // System.out.println("tempRep[i] ::" + tempRep[i]);
1095            }
1096            //name = InetAddress.getLocalHost().getHostName()+"_"+pubName+"_"+port;
1097            name = InetAddress.getLocalHost().getHostName() + "_" + port;
1098            //Creates publication by setting it's DataBaseHandler &  XMLCreator
1099            Publication publ = new Publication(connectionPool, pubName, name, this);
1100            //Sets publication's repTable elements
1101    //System.out.println(" ACTUAL NO of repTables :: " + repTables.size());
1102            publ.setPublicationTables(repTables);
1103            publ.setCyclic(removeCycleTableNames != null ? removeCycleTableNames.length == 0 ? false : true : false);
1104            pubMap.put(pubName, publ);
1105            return publ;
1106          }
1107          catch (RepException ex) {
1108            throw ex;
1109          }
1110          catch (Exception ex) {
1111            log.error(ex.getMessage(), ex);
1112            RepConstants.writeERROR_FILE(ex);
1113            throw new RepException("REP039", new Object[] {ex.getMessage()});
1114          }
1115            finally {
1116                    connectionPool.returnConnection(connection);
1117            }
1118      }
1119      //Return ArrayList having table having cycles
1120    //  this Arraylist needed if working through GUI
1121      public ArrayList getTablesInCycle(){
1122        ArrayList tablesRelatedInCycle=new ArrayList();
1123        int listSize=tablesInCycle.size();
1124        for (int i = 0; i < listSize; i++) {
1125          if(i!=listSize-1){
1126            tablesRelatedInCycle.add(tablesInCycle.get(i) + "-" +
1127                                     tablesInCycle.get(i + 1));
1128          }
1129        }
1130        return tablesRelatedInCycle;
1131      }
1132    
1133    
1134    
1135        public void shutdown() throws RepException
1136        {
1137            try
1138            {
1139                if (log.isDebugEnabled())
1140                {
1141                    log.debug("Daffodil Replicator Server shutdown called");
1142                }
1143    
1144                String ipAddr = null;
1145    
1146                try
1147                {
1148                    ipAddr = InetAddress.getByName(url).getHostAddress();
1149                }
1150                catch (UnknownHostException ex)
1151                {
1152                    log.error(ex.getMessage(), ex);
1153                    throw new RepException("REP004", new Object[]{ex.getMessage()});
1154                }
1155    
1156                //String url = "rmi://" + ipAddr + ":" + Integer.toString(port) + "/" +ipAddr + "_" + port;
1157                String url = "rmi://0.0.0.0:" + Integer.toString(port) + "/" + ipAddr + "_" + port;
1158    
1159                Naming.unbind(url);
1160    
1161                if (log.isDebugEnabled())
1162                {
1163                    log.debug("Daffodil Replicator Server shutdown finished");
1164                }
1165            }
1166            catch (Exception ex)
1167            {
1168                // seems the server is not running which is ok because we wanted to shut it down anyway 
1169            }
1170        }
1171    
1172      /**
1173       * getRemoteAddress
1174       *
1175       * @return String
1176       */
1177      public String getRemoteAddress() throws RepException {
1178        try {
1179          //return InetAddress.getLocalHost().getHostAddress();
1180          return InetAddress.getLocalHost().getHostName();
1181        }
1182        catch (UnknownHostException ex) {
1183          return null;
1184        }
1185      }
1186    }





























































Powered by Drupal - Theme by Danger4k