该客户端运行在给用户提供unix服务的服务器上。用来读取并收集该服务器上用户的上下线信息,并进行配对整理后发送给服务端汇总。
package com.dms;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import org.dom4j.Document;
import org.dom4j.Element;
import org.dom4j.io.SAXReader;
/**
* DMS服务端,用来接收每个客户端发送过来的
* 配对日志并保存在本地文件中
* @author Administrator
*
*/
public class DMSServer {
//属性定义
//用来接收客户端连接的服务端的ServerSocket
private ServerSocket server;
//用来管理处理客户端请求的线程的线程池
private ExecutorService threadPool;
//保存所有客户端发送过来配对日志的文件
private File serverLogFile;
//消息队列
private BlockingQueue<String> messageQueue = new LinkedBlockingQueue<String>();
public DMSServer() throws Exception{
try {
System.out.println("服务端正在初始化...");
//1 解析配置文件server-config.xml
Map<String,String> config = loadConfig();
//2 根据配置文件内容初始化属性
init(config);
System.out.println("服务端初始化完毕...");
} catch (Exception e) {
System.out.println("初始化服务端失败!");
throw e;
}
}
/**
* 构造方法初始化第一步,解析配置文件
* @return 返回的Map中保存的是配置文件中的
* 每一条内容,其中key:标签的名字,
* value为标签中间的文本
* @throws Exception
*/
private Map<String,String> loadConfig() throws Exception{
try {
SAXReader reader = new SAXReader();
Document doc
= reader.read(new File("server-config.xml"));
Element root = doc.getRootElement();
Map<String,String> config
= new HashMap<String,String>();
/*
* 获取<config>标签中的所有子标签
* 并将每一个子标签的名字作为key,中间的
* 文本作为value存入Map集合
*/
List<Element> list = root.elements();
for(Element e : list){
String key = e.getName();
String value = e.getTextTrim();
config.put(key, value);
}
return config;
} catch (Exception e) {
System.out.println("解析配置文件异常!");
e.printStackTrace();
throw e;
}
}
/**
* 构造方法初始化第二步,根据配置项初始化属性
* @param config
* @throws Exception
*/
private void init(Map<String,String> config) throws Exception{
/*
* 用配置文件中的<logrecfile>初始化属性:serverLogFile
* 用配置文件中的<threadsum>初始化属性:threadPool,这里创建固定大小线程池。该值作为线程池线程数量
* 用配置文件中的<serverport>初始化属性:server,这里这个值为ServerSocket的服务端口
*/
this.server = new ServerSocket(
Integer.parseInt(config.get("serverport"))
);
this.serverLogFile = new File(
config.get("logrecfile")
);
this.threadPool = Executors.newFixedThreadPool(
Integer.parseInt(config.get("threadsum"))
);
}
/**
* 服务端开始工作的方法
* @throws Exception
*/
public void start() throws Exception{
/*
* 实现要求:
* 首先单独启动一个线程,用来运行SaveLogHandler
* 这个任务,目的是保存所有配对日志
* 然后开始循环监听服务端端口,一旦一个客户端连接了,
* 就实例化一个ClientHander,然后将该任务交给线程池
* 使其分配线程来处理与该客户端的交互。
*
*/
try {
System.out.println("服务端开始工作...");
SaveLogHandler slh=new SaveLogHandler();
new Thread(slh).start();
while(true){
Socket socket=server.accept();
threadPool.execute(new ClientHandler(socket));
}
} catch (Exception e) {
e.printStackTrace();
throw e;
}
}
public static void main(String[] args) {
try {
DMSServer server = new DMSServer();
server.start();
} catch (Exception e) {
System.out.println("启动服务端失败!");
}
}
/**
* 该线程负责从消息队列中取出每一条配对日志,
* 并存入到serverLogFile文件
* @author Administrator
*
*/
private class SaveLogHandler implements Runnable{
public void run(){
PrintWriter pw = null;
try {
pw = new PrintWriter(
new FileOutputStream(
serverLogFile,true
)
);
while(true){
if(messageQueue.size()>0){
pw.println(messageQueue.poll());
}else{
pw.flush();
Thread.sleep(500);
}
}
} catch (Exception e) {
e.printStackTrace();
} finally{
if(pw != null){
pw.close();
}
}
}
}
/**
* 处理一个指定客户端请求
* @author Administrator
*
*/
private class ClientHandler implements Runnable{
private Socket socket;
public ClientHandler(Socket socket){
this.socket = socket;
}
public void run(){
/*
* 思路:
* 首先接收客户端发送过来的所有配对日志,
* 直到读取到"OVER"为止,然后将这些配对
* 日志保存到本地的文件中,并回复客户端
* "OK"
* 执行步骤:
* 1:通过Socket创建输出流,用来给客户端
* 发送响应
* 2:通过Socket创建输入流,读取客户端发送
* 过来的日志
* 3:循环读取客户端发送过来的每一行字符串,并
* 先判断是否为字符串"OVER",若不是,则是
* 一条配对日志,那么保存到本地文件,若是,
* 则停止读取。
* 4:成功读取所有日志后回复客户端"OK"
*/
PrintWriter pw = null;
try {
//1
pw = new PrintWriter(
new OutputStreamWriter(
socket.getOutputStream(),"UTF-8"
)
);
//2
BufferedReader br = new BufferedReader(
new InputStreamReader(
socket.getInputStream(),"UTF-8"
)
);
//3
String message = null;
while((message = br.readLine())!=null){
if("OVER".equals(message)){
break;
}
//将该日志写入文件保存
messageQueue.offer(message);
}
//4
pw.println("OK");
pw.flush();
} catch (Exception e) {
e.printStackTrace();
pw.println("ERROR");
pw.flush();
} finally{
try {
//与客户端断开连接释放资源
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
package com.dms;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.io.RandomAccessFile;
import java.net.Socket;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.dom4j.Document;
import org.dom4j.Element;
import org.dom4j.io.SAXReader;
import com.dms.bo.LogData;
import com.dms.bo.LogRec;
/**
* 该客户端运行在给用户提供unix服务的服务器上。
* 用来读取并收集该服务器上用户的上下线信息,并
* 进行配对整理后发送给服务端汇总。
* @author Administrator
*
*/
public class DMSClient {
//属性定义
//第一步:解析日志所需属性
//unix系统日志文件
private File logFile;
//保存解析后日志的文件
private File textLogFile;
//书签文件
private File lastPositionFile;
//每次解析日志的条目数
private int batch;
//第二步:配对日志所需要属性
//保存配对日志的文件
private File logRecFile;
//保存未配对日志的文件
private File loginLogFile;
//第三步:发送日志所需要属性
//服务端地址
private String serverHost;
//服务端端口
private int serverPort;
/**
* 构造方法,用来初始化客户端
* @throws Exception
*/
public DMSClient() throws Exception{
try {
//1 解析配置文件config.xml
Map<String,String> config = loadConfig();
//打桩
System.out.println(config);
//2 根据配置文件内容初始化属性
init(config);
} catch (Exception e) {
System.out.println("初始化失败!");
throw e;
}
}
/**
* 构造方法初始化第二步,根据配置项初始化属性
* @param config
* @throws Exception
*/
private void init(Map<String,String> config) throws Exception{
try {
logFile = new File(
config.get("logfile")
);
textLogFile = new File(
config.get("textlogfile")
);
lastPositionFile = new File(
config.get("lastpositionfile")
);
batch = Integer.parseInt(
config.get("batch")
);
logRecFile = new File(
config.get("logrecfile")
);
loginLogFile = new File(
config.get("loginlogfile")
);
serverHost = config.get("serverhost");
serverPort = Integer.parseInt(
config.get("serverport")
);
} catch (Exception e) {
System.out.println("初始化属性失败!");
e.printStackTrace();
throw e;
}
}
/**
* 构造方法初始化第一步,解析配置文件
* @return 返回的Map中保存的是配置文件中的
* 每一条内容,其中key:标签的名字,
* value为标签中间的文本
* @throws Exception
*/
private Map<String,String> loadConfig() throws Exception{
try {
SAXReader reader = new SAXReader();
Document doc
= reader.read(new File("config.xml"));
Element root = doc.getRootElement();
Map<String,String> config
= new HashMap<String,String>();
/*
* 获取<config>标签中的所有子标签
* 并将每一个子标签的名字作为key,中间的
* 文本作为value存入Map集合
*/
List<Element> list = root.elements();
for(Element e : list){
String key = e.getName();
String value = e.getTextTrim();
config.put(key, value);
}
return config;
} catch (Exception e) {
System.out.println("解析配置文件异常!");
e.printStackTrace();
throw e;
}
}
/**
* 客户端开始工作的方法
* 循环执行三步:
* 1:解析日志
* 2:配对日志
* 3:发送日志
*/
public void start(){
parseLogs();
matchLogs();
sendLogs();
// while(true){
// //解析日志
// if(!parseLogs()){
// continue;
// }
// //配对日志
// if(!matchLogs()){
// continue;
// }
// //发送日志
// sendLogs();
// }
}
/**
* 第三步:发送日志
* @return true:发送成功
* false:发送失败
*/
private boolean sendLogs(){
/*
* 实现思路:
* 将logRecFile文件中的所有配对日志读取
* 出来然后连接上服务端并发送过去,若服务端
* 全部接收,就可以将该文件删除,表示发送
* 完毕了。
* 实现步骤:
* 1:logRecFile文件必须存在
* 2:将所有配对日志读取出来并存入一个集合
* 等待发送
* 3:通过Socket连接服务端
* 4:创建输出流
* 5:顺序将所有配对日志按行发送给服务端
* 6:单独发送一个字符串"OVER"表示所有日志
* 均已发送完毕
* 7:创建输入流
* 8:读取服务端发送回来的响应字符串
* 9:若响应的字符串为"OK",表示服务端正常
* 接收了所有日志,这时就可以将logRecFile
* 文件删除并返回true表示发送完毕。
*
*/
Socket socket = null;
try {
//1
if(!logRecFile.exists()){
System.out.println(logRecFile+"不存在!");
return false;
}
//2
List<String> matches
= IOUtil.loadLogRec(logRecFile);
//3
socket = new Socket(serverHost,serverPort);
//4
PrintWriter pw = new PrintWriter(
new OutputStreamWriter(
socket.getOutputStream(),"UTF-8"
)
);
//5
for(String log : matches){
pw.println(log);
}
//6
pw.println("OVER");
pw.flush();
//7
BufferedReader br = new BufferedReader(
new InputStreamReader(
socket.getInputStream(),"UTF-8"
)
);
//8
String response = br.readLine();
//9
if("OK".equals(response)){
logRecFile.delete();
return true;
}else{
System.out.println("发送日志失败!");
return false;
}
} catch (Exception e) {
System.out.println("发送日志失败!");
e.printStackTrace();
} finally{
if(socket != null){
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return false;
}
/**
* 第二步:配对日志
* @return true:配对成功
* false:配对失败
*/
private boolean matchLogs(){
/*
* 实现思路:
* 将第一步解析的新日志,与上次为配对成功
* 的登入日志全部读取出来,然后再按照user,
* pid相同,type一个是7,一个是8进行配对。
* 只要能找到类型为8的,一定可以找到一个
* 能与之配对的登入日志。
*
* 实现步骤:
* 1:必要的判断
* 1.1:logRecFile是否存在,存在则不再
* 进行新的配对工作,避免覆盖。
* 1.2:textLogFile文件必须存在。
* 2:读取textLogFile将日志读取出来,并
* 存入到集合中。(若干LogData实例)
* 3:若loginLogFile文件若存在,则说明
* 有上次未配对成功的日志,也将其读取
* 出来存入集合等待一起配对
* 4:配对工作
* 4.1:创建一个集合,用于保存所有配对日志
* 4.2:创建两个Map分别保存登入日志与登出日志
* 4.3:遍历所有待配对的日志,按照登入与登出
* 分别存入两个Map中,
* 其中key:user,pid
* value:LogData实例
* 4.4:遍历登出Map,并根据每条登出日志的key
* 去登入Map中找到对应的登入日志,并
* 以一个LogRec实例保存该配对日志,然后
* 存入配对日志的集合中。并将该配对日志
* 中的登入日志从登入Map中删除。这样一来
* 登入Map中应当只剩下没有配对的了。
* 5:将配对日志写入到logRecFile中
* 6:将所有未配对日志写入到loginLogFile中
* 7:将textLogFile文件删除
* 8:返回true,表示配对完毕
*
*/
try {
//1
//1.1
if(logRecFile.exists()){
return true;
}
//1.2
if(!textLogFile.exists()){
System.out.println(textLogFile+"不存在!");
return false;
}
//2
List<LogData> list
= IOUtil.loadLogData(textLogFile);
//3
if(loginLogFile.exists()){
list.addAll(
IOUtil.loadLogData(loginLogFile)
);
}
//4
//4.1
List<LogRec> matches
= new ArrayList<LogRec>();
//4.2
Map<String,LogData> loginMap
= new HashMap<String,LogData>();
Map<String,LogData> logoutMap
= new HashMap<String,LogData>();
//4.3
for(LogData logData : list){
String key = logData.getUser()+","+
logData.getPid();
if(logData.getType()==LogData.TYPE_LOGIN){
loginMap.put(key, logData);
}else if(logData.getType()==LogData.TYPE_LOGOUT){
logoutMap.put(key, logData);
}
}
//4.4
Set<Entry<String,LogData>> entrySet
= logoutMap.entrySet();
for(Entry<String,LogData> e : entrySet){
LogData logout = e.getValue();
LogData login = loginMap.remove(e.getKey());
LogRec logRec = new LogRec(login,logout);
matches.add(logRec);
}
//5
IOUtil.saveCollection(matches, logRecFile);
//6
IOUtil.saveCollection(
loginMap.values(),loginLogFile
);
//7
textLogFile.delete();
//8
return true;
} catch (Exception e) {
System.out.println("配对日志失败!");
e.printStackTrace();
}
return false;
}
/**
* 第一步:解析日志
* @return true:解析成功
* false:解析失败
*/
private boolean parseLogs(){
/*
* 实现思路:
* 循环读取batch条日志,然后将每条日志中的
* 5个信息解析出来,最终组成一个字符串,以
* 行为单位,写入到textLogFile文件中
*
* 实现步骤:
* 1:必要的判断工作
* 1.1:为了避免解析的日志还没有被使用,而
* 第一步又重复执行导致之前日志被覆盖
* 的问题,这里需要判断,若保存解析后
* 的日志文件存在,则第一步不再执行。
* 该日志文件会在第二步配对完毕后删除。
* 1.2:logFile文件必须存在(wtmpx文件)
* 1.3:是否还有日志可以解析
* 2:创建RandomAccessFile来读取logFile
* 3:将指针移动到上次最后读取的位置,准备
* 开始新的解析工作
* 4:解析工作
* 4.1:创建一个List集合,用于保存解析后
* 的每一条日志(LogData实例)
* 4.2:循环batch次,解析每条日志中的
* 5项内容(user,pid,type,time,host)
* 并用一个LogData实例保存,然后将
* 该LogData实例存入集合
* 5:将集合中的所有的日志以行为单位保存到
* textLogFile中
* 6:保存书签信息
* 7:返回true,表示工作完毕
*
*/
RandomAccessFile raf = null;
try {
//1
//1.1
if(textLogFile.exists()){
return true;
}
//1.2
if(!logFile.exists()){
System.out.println(logFile+"不存在!");
return false;
}
//1.3
long lastPosition = hasLogs();
//打桩
// System.out.println(
// "lastPosition:"+lastPosition
// );
if(lastPosition<0){
System.out.println("没有日志可以解析了!");
return false;
}
//2
raf = new RandomAccessFile(logFile,"r");
//3
raf.seek(lastPosition);
//4
List<LogData> list
= new ArrayList<LogData>();
for(int i=0;i<batch;i++){
//每次解析前都判断是否还有日志可以解析
if(logFile.length()-lastPosition
<LogData.LOG_LENGTH
){
break;
}
//解析user
raf.seek(lastPosition+LogData.USER_OFFSET);
String user
= IOUtil.readString(
raf, LogData.USER_LENGTH
).trim();
//解析PID
raf.seek(lastPosition+LogData.PID_OFFSET);
int pid = raf.readInt();
//解析TYPE
raf.seek(lastPosition+LogData.TYPE_OFFSET);
short type = raf.readShort();
//解析TIME
raf.seek(lastPosition+LogData.TIME_OFFSET);
int time = raf.readInt();
//解析HOST
raf.seek(lastPosition+LogData.HOST_OFFSET);
String host
= IOUtil.readString(
raf, LogData.HOST_LENGTH
).trim();
LogData log = new LogData(user, pid, type, time, host);
list.add(log);
//打桩
// System.out.println(log);
//当解析完一条日志后,更新lastPosition
lastPosition = raf.getFilePointer();
}
//5
IOUtil.saveCollection(list, textLogFile);
//6 保存书签文件
IOUtil.saveLong(
lastPosition, lastPositionFile);
//7
return true;
} catch (Exception e) {
System.out.println("解析日志失败!");
e.printStackTrace();
} finally{
if(raf != null){
try {
raf.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return false;
}
/**
* 第一步解析日志中的一个环节,
* 根据书签文件记录的位置判断是否还有
* 日志可以解析,若有,则将上次最后的位置
* 返回,若没有则返回-1。
* @return
*/
private long hasLogs(){
try {
/*
* 若lastPositionFile不存在,则说明
* 从来没有解析过,那么从头开始解析即可
*/
if(!lastPositionFile.exists()){
return 0;
}
long lastPosition
= IOUtil.readLong(lastPositionFile);
if(logFile.length()-lastPosition
>=LogData.LOG_LENGTH){
return lastPosition;
}
} catch (Exception e) {
e.printStackTrace();
}
return -1;
}
public static void main(String[] args) {
try {
DMSClient client = new DMSClient();
client.start();
} catch (Exception e) {
System.out.println("客户端运行失败!");
}
}
}
package com.dms;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.io.RandomAccessFile;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import com.dms.bo.LogData;
/**
* 该类是一个工具类,负责客户端的IO操作
* @author Administrator
*
*/
public class IOUtil {
/**
* 从给定的文件中读取每一行字符串(配对日志)
* 并存入一个集合后返回
* @param file
* @return
* @throws Exception
*/
public static List<String> loadLogRec(File file) throws Exception{
BufferedReader br = null;
try {
br = new BufferedReader(
new InputStreamReader(
new FileInputStream(
file
)
)
);
List<String> list
= new ArrayList<String>();
String line = null;
while((line = br.readLine())!=null){
list.add(line);
}
return list;
} catch (Exception e) {
e.printStackTrace();
throw e;
} finally{
if(br != null){
br.close();
}
}
}
/**
* 从给定的文件中读取每一条配对日志,并存入
* 一个集合中然后返回。
* @param file
* @return
* @throws Exception
*/
public static List<LogData> loadLogData(File file) throws Exception{
BufferedReader br = null;
try {
br = new BufferedReader(
new InputStreamReader(
new FileInputStream(
file
)
)
);
List<LogData> list = new ArrayList<LogData>();
String line = null;
while((line = br.readLine())!=null){
LogData logData = new LogData(line);
list.add(logData);
}
return list;
} catch (Exception e) {
e.printStackTrace();
throw e;
} finally{
if(br!=null){
br.close();
}
}
}
/**
* 将指定的long值以字符串的形式写入到
* 给定文件的第一行
* @param l
* @param file
* @throws Exception
*/
public static void saveLong(
long lon,File file) throws Exception{
PrintWriter pw = null;
try {
pw = new PrintWriter(file);
pw.println(lon);
} catch (Exception e) {
e.printStackTrace();
throw e;
} finally{
if(pw != null){
pw.close();
}
}
}
/**
* 将集合中每个元素的toString方法返回的字符串
* 以行为单位写入到指定文件中。
* @param c
* @param file
* @throws Exception
*/
public static void saveCollection(
Collection c,File file) throws Exception{
PrintWriter pw = null;
try {
pw = new PrintWriter(file);
for(Object o : c){
pw.println(o);
}
} catch (Exception e) {
e.printStackTrace();
throw e;
} finally{
if(pw != null){
pw.close();
}
}
}
/**
* 从给定的RandomAccessFile当前位置开始连续
* 读取length个字节,并转换为字符串后返回
* @param raf
* @param length
* @return
* @throws Exception
*/
public static String readString(
RandomAccessFile raf,int length) throws Exception{
try {
byte[] data = new byte[length];
raf.read(data);
return new String(data,"ISO8859-1");
} catch (Exception e) {
e.printStackTrace();
throw e;
}
}
/**
* 从给定文件中读取第一行字符串,然后将其
* 转换为一个long值后返回
* @param file
* @return
* @throws Exception
*/
public static long readLong(File file) throws Exception{
BufferedReader br = null;
try {
br = new BufferedReader(
new InputStreamReader(
new FileInputStream(
file
)
)
);
String line = br.readLine();
return Long.parseLong(line);
} catch (Exception e) {
e.printStackTrace();
throw e;
} finally{
if(br != null){
br.close();
}
}
}
}
<?xml version="1.0" encoding="UTF-8"?>
<config>
<!-- unix系统日志文件名 -->
<logfile>wtmpx</logfile>
<!-- 保存解析后日志的文件名 -->
<textlogfile>log.txt</textlogfile>
<!-- 书签文件名 -->
<lastpositionfile>last-position.txt</lastpositionfile>
<!-- 每次解析日志的条目数 -->
<batch>10</batch>
<!-- 配对日志文件名 -->
<logrecfile>logrec.txt</logrecfile>
<!-- 未配对日志文件名 -->
<loginlogfile>login.txt</loginlogfile>
<!-- 服务端地址 -->
<serverhost>localhost</serverhost>
<!-- 服务端端口 -->
<serverport>8088</serverport>
</config>
<?xml version="1.0" encoding="UTF-8"?>
<config>
<!-- 服务端保存配对日志文件的文件名 -->
<logrecfile>server-logs.txt</logrecfile>
<!-- 线程池线程数量 -->
<threadsum>30</threadsum>
<!-- 服务端端口 -->
<serverport>8088</serverport>
</config>