網(wǎng)上有很多關(guān)于pos機(jī)系統(tǒng)源碼,美團(tuán)leaf源碼解析的知識(shí),也有很多人為大家解答關(guān)于pos機(jī)系統(tǒng)源碼的問題,今天pos機(jī)之家(www.shbwcl.net)為大家整理了關(guān)于這方面的知識(shí),讓我們一起來看下吧!
本文目錄一覽:
pos機(jī)系統(tǒng)源碼
leaf提供了兩種id生成方式一種是基于mysql分段雙buffer模式,一種是基于zookeeper的。
雪花id生成器美團(tuán)使用的是zookeeper實(shí)現(xiàn)的,zookeeper在此中間件中扮演的角色總結(jié)如下,用來存儲(chǔ)每個(gè)分布式節(jié)點(diǎn)的ip,port信息,主要是每天機(jī)器的時(shí)間戳,這個(gè)信息用于保障每個(gè)節(jié)點(diǎn)生成的id不會(huì)出現(xiàn)回?fù)墁F(xiàn)象,出現(xiàn)回?fù)艿脑蚍治鋈缦?/p>
41bit的計(jì)算方式是機(jī)器的當(dāng)前時(shí)間減去系統(tǒng)初始化的時(shí)間的時(shí)間戳,leaf給的是 :Thu Nov 04 2010 09:42:54 GMT+0800 (中國(guó)標(biāo)準(zhǔn)時(shí)間) 1288834974657L的差值填充41bit的空間,從id的組成我們可以知道10bit的工作機(jī)器id不同就能保障一定的程度id不同性,但是人如果41bit的位置出現(xiàn)時(shí)間回?fù)芎竽敲磫我粀orker生成的id就可能會(huì)出現(xiàn)重復(fù)的id。
SnowflakeZookeeperHolder
這個(gè)類主要是處理zk的連接,以及數(shù)據(jù)存儲(chǔ)策略,包括如果分配workerid,以及workerid也會(huì)存儲(chǔ)在每個(gè)節(jié)點(diǎn)的本地化文件中,從代碼中我們可以知道,每個(gè)zk的path是這樣的/snowflake/leafname/forever/ip:port-0000000001 后面的數(shù)字就是workerid由zk分配的
public class SnowflakeZookeeperHolder { private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeZookeeperHolder.class); private String zk_Addressnode = null;//保存自身的key ip:port-000000001 private String listenAddress = null;//保存自身的key ip:port private int workerID; private static final String PREFIX_ZK_PATH = "/snowflake/" + PropertyFactory.getProperties().getProperty("leaf.name"); private static final String PROP_PATH = System.getProperty("java.io.tmpdir") + File.separator + PropertyFactory.getProperties().getProperty("leaf.name") + "/leafconf/{port}/workerID.properties"; private static final String PATH_FOREVER = PREFIX_ZK_PATH + "/forever";//保存所有數(shù)據(jù)持久的節(jié)點(diǎn) private String ip; private String port; private String connectionString; private long lastUpdateTime; public SnowflakeZookeeperHolder(String ip, String port, String connectionString) { this.ip = ip; this.port = port; this.listenAddress = ip + ":" + port; this.connectionString = connectionString; } /** * 啟動(dòng)應(yīng)用使用zk的curator客戶端連接zk,判斷l(xiāng)eaf指定業(yè)務(wù)的根節(jié)點(diǎn)是否存在,leaf中使用的劃分邏輯如下 * /snowflake/leafname/forever/ip:port-0000000001 * /snowflake/leafname(不同業(yè)務(wù)可以使用不同名字)訂單,用戶/forever/ip:port(這里指的是提供id生產(chǎn)服務(wù)的機(jī)器)-0000000001(順序節(jié)點(diǎn)序號(hào)) * 里面的內(nèi)容存的是endpoint內(nèi)容{"ip","xxx.xxx.xxx.xxx","port":"8080","timestamp":"timestamp"} * 先掃描業(yè)務(wù)目錄,如果業(yè)務(wù)目錄不存在那么說明第一次啟動(dòng)這個(gè)業(yè)務(wù),因此當(dāng)前主機(jī)要把自己的信息保存進(jìn)去,默認(rèn)id為0 */ public boolean init() { try { CuratorFramework curator = createWithOptions(connectionString, new RetryUntilElapsed(1000, 4), 10000, 6000); curator.start(); Stat stat = curator.checkExists().forPath(PATH_FOREVER); if (stat == null) { //不存在根節(jié)點(diǎn),機(jī)器第一次啟動(dòng),創(chuàng)建/snowflake/ip:port-000000000,并上傳數(shù)據(jù) zk_AddressNode = createNode(curator); //worker id 默認(rèn)是0 //guozc 潛在并發(fā)問題???jī)蓚€(gè)節(jié)點(diǎn)同時(shí)去創(chuàng)建節(jié)點(diǎn)都成功了,因?yàn)閣orker默認(rèn)是0可能會(huì)造成本地文件存儲(chǔ)的id為0,極端情況下? updateLocalWorkerID(workerID); //定時(shí)上報(bào)本機(jī)時(shí)間給forever節(jié)點(diǎn) //定時(shí)任務(wù)每三秒鐘上報(bào)一下本機(jī)信息,里面關(guān)鍵信息是每次上報(bào)的時(shí)間戳,防止數(shù)顯時(shí)鐘回?fù)? ScheduledUploadData(curator, zk_AddressNode); return true; } else { //業(yè)務(wù)目錄存在那么就檢查是否有自己的節(jié)點(diǎn) Map<String, Integer> nodeMap = Maps.newHashMap();//ip:port->00001 Map<String, String> realNode = Maps.newHashMap();//ip:port->(ipport-000001) //存在根節(jié)點(diǎn),先檢查是否有屬于自己的根節(jié)點(diǎn) List<String> keys = curator.getChildren().forPath(PATH_FOREVER); for (String key : keys) { String[] nodeKey = key.split("-"); realNode.put(nodeKey[0], key); nodeMap.put(nodeKey[0], Integer.parseInt(nodeKey[1])); } Integer workerid = nodeMap.get(listenAddress); if (workerid != null) { //有自己的節(jié)點(diǎn),zk_AddressNode=ip:port zk_AddressNode = PATH_FOREVER + "/" + realNode.get(listenAddress); workerID = workerid;//啟動(dòng)worder時(shí)使用會(huì)使用 if (!checkInitTimeStamp(curator, zk_AddressNode)) { throw new CheckLastTimeException("init timestamp check error,forever node timestamp gt this node time"); } //準(zhǔn)備創(chuàng)建臨時(shí)節(jié)點(diǎn) doService(curator); updateLocalWorkerID(workerID); LOGGER.info("[Old NODE]find forever node have this endpoint ip-{} port-{} workid-{} childnode and start SUCCESS", ip, port, workerID); } else { //表示新啟動(dòng)的節(jié)點(diǎn),創(chuàng)建持久節(jié)點(diǎn) ,不用check時(shí)間 String newNode = createNode(curator); zk_AddressNode = newNode; String[] nodeKey = newNode.split("-"); workerID = Integer.parseInt(nodeKey[1]); doService(curator); updateLocalWorkerID(workerID); LOGGER.info("[New NODE]can not find node on forever node that endpoint ip-{} port-{} workid-{},create own node on forever node and start SUCCESS ", ip, port, workerID); } } } catch (Exception e) { LOGGER.error("Start node ERROR {}", e); try { Properties properties = new Properties(); properties.load(new FileInputStream(new File(PROP_PATH.replace("{port}", port + "")))); workerID = Integer.valueOf(properties.getProperty("workerID")); LOGGER.warn("START FAILED ,use local node file properties workerID-{}", workerID); } catch (Exception e1) { LOGGER.error("Read file error ", e1); return false; } } return true; } private void doService(CuratorFramework curator) { ScheduledUploadData(curator, zk_AddressNode);// /snowflake_forever/ip:port-000000001 } private void ScheduledUploadData(final CuratorFramework curator, final String zk_AddressNode) { Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r, "schedule-upload-time"); thread.setDaemon(true); return thread; } }).scheduleWithFixedDelay(new Runnable() { @Override public void run() { updateNewData(curator, zk_AddressNode); } }, 1L, 3L, TimeUnit.SECONDS);//每3s上報(bào)數(shù)據(jù) } /** * 檢查zookeeper中數(shù)據(jù)是否小于當(dāng)前機(jī)器的系統(tǒng)時(shí)間,因?yàn)槊總€(gè)機(jī)器在zk中都有一個(gè)自己的節(jié)點(diǎn)用于存儲(chǔ)endpoint數(shù)據(jù) */ private boolean checkInitTimeStamp(CuratorFramework curator, String zk_AddressNode) throws Exception { byte[] bytes = curator.getData().forPath(zk_AddressNode); Endpoint endPoint = deBuildData(new String(bytes)); //該節(jié)點(diǎn)的時(shí)間不能小于最后一次上報(bào)的時(shí)間 return !(endPoint.getTimestamp() > System.currentTimeMillis()); } /** * 創(chuàng)建持久順序節(jié)點(diǎn) ,并把節(jié)點(diǎn)數(shù)據(jù)放入 value * * @param curator * @return * @throws Exception */ private String createNode(CuratorFramework curator) throws Exception { try { return curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(PATH_FOREVER + "/" + listenAddress + "-", buildData().getBytes()); } catch (Exception e) { LOGGER.error("create node error msg {} ", e.getMessage()); throw e; } } private void updateNewData(CuratorFramework curator, String path) { try { if (System.currentTimeMillis() < lastUpdateTime) { return; } curator.setData().forPath(path, buildData().getBytes()); lastUpdateTime = System.currentTimeMillis(); } catch (Exception e) { LOGGER.info("update init data error path is {} error is {}", path, e); } } /** * 構(gòu)建需要上傳的數(shù)據(jù) * * @return */ private String buildData() throws JsonProcessingException { Endpoint endpoint = new Endpoint(ip, port, System.currentTimeMillis()); ObjectMapper mapper = new ObjectMapper(); String json = mapper.writeValueAsString(endpoint); return json; } /** * 將json字符串轉(zhuǎn)換為endpoint對(duì)象 */ private Endpoint deBuildData(String json) throws IOException { ObjectMapper mapper = new ObjectMapper(); Endpoint endpoint = mapper.readValue(json, Endpoint.class); return endpoint; } /** * 在節(jié)點(diǎn)文件系統(tǒng)上緩存一個(gè)workid值,zk失效,機(jī)器重啟時(shí)保證能夠正常啟動(dòng) * * @param workerID */ private void updateLocalWorkerID(int workerID) { File leafConfFile = new File(PROP_PATH.replace("{port}", port)); boolean exists = leafConfFile.exists(); LOGGER.info("file exists status is {}", exists); if (exists) { try { FileUtils.writeStringToFile(leafConfFile, "workerID=" + workerID, false); LOGGER.info("update file cache workerID is {}", workerID); } catch (IOException e) { LOGGER.error("update file cache error ", e); } } else { //不存在文件,父目錄頁肯定不存在 try { boolean mkdirs = leafConfFile.getParentFile().mkdirs(); LOGGER.info("init local file cache create parent dis status is {}, worker id is {}", mkdirs, workerID); if (mkdirs) { if (leafConfFile.createNewFile()) { FileUtils.writeStringToFile(leafConfFile, "workerID=" + workerID, false); LOGGER.info("local file cache workerID is {}", workerID); } } else { LOGGER.warn("create parent dir error==="); } } catch (IOException e) { LOGGER.warn("craete workerID conf file error", e); } } } private CuratorFramework createWithOptions(String connectionString, RetryPolicy retryPolicy, int connectionTimeoutMs, int sessionTimeoutMs) { return CuratorFrameworkFactory.builder().connectString(connectionString) .retryPolicy(retryPolicy) .connectionTimeoutMs(connectionTimeoutMs) .sessionTimeoutMs(sessionTimeoutMs) .build(); } /** * 上報(bào)數(shù)據(jù)結(jié)構(gòu) */ static class Endpoint { private String ip; private String port; private long timestamp; public Endpoint() { } public Endpoint(String ip, String port, long timestamp) { this.ip = ip; this.port = port; this.timestamp = timestamp; } public String getIp() { return ip; } public void setIp(String ip) { this.ip = ip; } public String getPort() { return port; } public void setPort(String port) { this.port = port; } public long getTimestamp() { return timestamp; } public void setTimestamp(long timestamp) { this.timestamp = timestamp; } } public String getZk_AddressNode() { return zk_AddressNode; } public void setZk_AddressNode(String zk_AddressNode) { this.zk_AddressNode = zk_AddressNode; } public String getListenAddress() { return listenAddress; } public void setListenAddress(String listenAddress) { this.listenAddress = listenAddress; } public int getWorkerID() { return workerID; } public void setWorkerID(int workerID) { this.workerID = workerID; } public static void main(String[] args) { try { System.out.println(Integer.parseInt("0000000008")); } catch (Exception e) { e.printStackTrace(); } }}
另外一個(gè)核心類是雪花的實(shí)現(xiàn)細(xì)節(jié),包括了一個(gè)id是如何生成的,還有就是時(shí)間是如何更新到zk的,還有就是人如果當(dāng)前毫秒中id分配不夠了leaf是如何處理的
/** * 使用位運(yùn)算拼接一個(gè)long類型的id出來,主要是利用時(shí)間做高41位的內(nèi)容,中間是10bit的機(jī)器id,也基本夠用了,一個(gè)服務(wù)的id生成理論上不會(huì)超過1023個(gè)服務(wù)節(jié)點(diǎn) * 最后的12bit用來做遞增 */public class SnowflakeIDGenImpl implements IDGen { @Override public boolean init() { return true; } private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeIDGenImpl.class); //開始時(shí)間戳 //個(gè)數(shù)字可以指定為任意小于當(dāng)前時(shí)間的數(shù)字,這樣就能讓競(jìng)爭(zhēng)對(duì)手無法知道我們的id信息了,這就解決了基于mysql的segement方案造成的容易被競(jìng)爭(zhēng)對(duì)手監(jiān)控的問題了,因?yàn)橛袝r(shí)間維度的參與,對(duì)手不知道我們每時(shí)每刻的id發(fā)放信息 private final long twepoch; //wokerid占用的位數(shù) private final long workerIdBits = 10L; //worker最大的id1023 private final long maxWorkerId = ~(-1L << workerIdBits);//最大能夠分配的workerid =1023 //每毫秒的id數(shù)字最大值 private final long sequenceBits = 12L; //workerid的偏移量 private final long workerIdShift = sequenceBits; //時(shí)間戳位移數(shù) private final long timestampLeftShift = sequenceBits + workerIdBits; //每個(gè)毫秒生成id數(shù)的掩碼,用來進(jìn)行與運(yùn)算提高運(yùn)算效率,他讓自己的高位是1,其他都是0那么在與的時(shí)候如果沒滿則是sequence 如果是0說明與的那個(gè)數(shù)字二進(jìn)制后12位全是0了,也就是滿了,因此會(huì)休息一個(gè)死循環(huán)的時(shí)間然后繼續(xù)生成id private final long sequenceMask = ~(-1L << sequenceBits); //工作節(jié)點(diǎn)的id private long workerId; //每個(gè)毫秒自增id數(shù)字 private long sequence = 0L; //保存上次生成id時(shí)候的時(shí)間戳 private long lastTimestamp = -1L; //new一個(gè)隨機(jī)函數(shù)對(duì)象,多線程公用,并發(fā)性問題交給了synchronized關(guān)鍵字,并且公用對(duì)象后降低了new的成本 private static final Random RANDOM = new Random(); public SnowflakeIDGenImpl(String zkAddress, int port) { //Thu Nov 04 2010 09:42:54 GMT+0800 (中國(guó)標(biāo)準(zhǔn)時(shí)間) this(zkAddress, port, 1288834974657L); } /** * @param zkAddress zk地址 * @param port snowflake監(jiān)聽端口 * @param twepoch * * 初始化應(yīng)用,主要是SnowflakeZookeeperHolder 里面的初始化,包括節(jié)點(diǎn)的創(chuàng)建,或者數(shù)據(jù)的同步,還有主要是完成時(shí)間的檢查,方式工作節(jié)點(diǎn)始終回?fù)? * 并且將workerid進(jìn)行賦值,方便生成id時(shí)候使用 */ public SnowflakeIDGenImpl(String zkAddress, int port, long twepoch) { this.twepoch = twepoch; Preconditions.checkArgument(timeGen() > twepoch, "Snowflake not support twepoch gt currentTime"); final String ip = Utils.getIp(); SnowflakeZookeeperHolder holder = new SnowflakeZookeeperHolder(ip, String.valueOf(port), zkAddress); LOGGER.info("twepoch:{} ,ip:{} ,zkAddress:{} port:{}", twepoch, ip, zkAddress, port); boolean initFlag = holder.init(); if (initFlag) { workerId = holder.getWorkerID(); LOGGER.info("START SUCCESS USE ZK WORKERID-{}", workerId); } else { Preconditions.checkArgument(initFlag, "Snowflake Id Gen is not init ok"); } Preconditions.checkArgument(workerId >= 0 && workerId <= maxWorkerId, "workerID must gte 0 and lte 1023"); } /** * 核心方法這個(gè)就是獲得雪花id的方法,因?yàn)槭前凑諘r(shí)間軸進(jìn)行發(fā)布的,因此不存在不同的業(yè)務(wù)key的隔離,因?yàn)樗械臉I(yè)務(wù)的id都不會(huì)重復(fù),(就是這么的任性) * 先取到系統(tǒng)時(shí)間戳,然后跟對(duì)象中的 lastTimestamp比較如果系統(tǒng)時(shí)間比對(duì)象時(shí)間回?fù)芰?毫秒那么久稍作休息wait一下,也就是等待兩倍的毫秒數(shù),因?yàn)樽笠苿?dòng)1二進(jìn)制翻一倍, * 如果線程醒過來后還是有偏移量那么就返回錯(cuò)誤。如果偏移量超過5毫秒,那么代表著偏移量太大,那么就返回錯(cuò)誤, * 如果對(duì)象中的 lastTimestamp 與當(dāng)前機(jī)器中系統(tǒng)時(shí)間一樣,這里面說明一下,這種情況下肯定是比較高的并發(fā)情況下的必然了,因?yàn)槊看伟l(fā)放id后對(duì)象時(shí)間都會(huì)被置為當(dāng)時(shí)取的系統(tǒng)時(shí)間 * 也就是一個(gè)毫秒中會(huì)發(fā)憷多個(gè)id,那么處理邏輯就是給 sequence不停的加一,這里面的與其實(shí)就是2的12次方-1,也就是整了個(gè)sequence的最大值,這樣出現(xiàn)0代表 sequence + 1變成了 * 2的12次方-1了,那么也就是意味著并發(fā)真的很大,一毫秒中的id被打光了,那么系統(tǒng)就調(diào)用 tilNextMillis 進(jìn)行死循環(huán)的等待,因?yàn)檫@種等待是毫秒級(jí)的,所以使用了while循環(huán) * 如果是新的毫秒是就生成一個(gè)隨機(jī)數(shù),作為sequence的新值,緊接著對(duì)lastTimestamp賦值,然后利用位運(yùn)算生成一個(gè)long的id進(jìn)行返回 */ @Override public synchronized Result get(String key) { long timestamp = timeGen(); if (timestamp < lastTimestamp) { long offset = lastTimestamp - timestamp; if (offset <= 5) { try { wait(offset << 1); timestamp = timeGen(); if (timestamp < lastTimestamp) { return new Result(-1, Status.EXCEPTION); } } catch (InterruptedException e) { LOGGER.error("wait interrupted"); return new Result(-2, Status.EXCEPTION); } } else { return new Result(-3, Status.EXCEPTION); } } if (lastTimestamp == timestamp) { sequence = (sequence + 1) & sequenceMask; if (sequence == 0) { //seq 為0的時(shí)候表示是下一毫秒時(shí)間開始對(duì)seq做隨機(jī) sequence = RANDOM.nextInt(100); timestamp = tilNextMillis(lastTimestamp); } } else { //如果是新的ms開始 sequence = RANDOM.nextInt(100); } lastTimestamp = timestamp; long id = ((timestamp - twepoch) << timestampLeftShift) | (workerId << workerIdShift) | sequence; return new Result(id, Status.SUCCESS); } /** * 通過死循環(huán)的形式確保時(shí)間進(jìn)行了后移,因?yàn)樽疃嘁簿褪峭A粢缓撩耄允褂盟姥h(huán)的形式代價(jià)更低 */ protected long tilNextMillis(long lastTimestamp) { long timestamp = timeGen(); while (timestamp <= lastTimestamp) { timestamp = timeGen(); } return timestamp; } protected long timeGen() { return System.currentTimeMillis(); } public long getWorkerId() { return workerId; } public static void main(String[] args) { Date date = new Date("Mon 6 Jan 1997 13:3:00"); long id = ((System.currentTimeMillis()- date.getTime()) << 22L) | (10 << 12L) | (1) & ~(-1L << 12L); System.out.println(id); System.out.println(10 << 12L); System.out.println(Long.toBinaryString(id)); }}分段id生成器
美團(tuán)實(shí)現(xiàn)的分段id生成器使用的則是mysql數(shù)據(jù)庫完成數(shù)據(jù)的共享,并通過update語句加鎖完成并發(fā)控制。接下來分析一下分段id生成的核心代碼就一個(gè)類
package com.sankuai.inf.leaf.segment;import com.sankuai.inf.leaf.IDGen;import com.sankuai.inf.leaf.common.Result;import com.sankuai.inf.leaf.common.Status;import com.sankuai.inf.leaf.segment.dao.IDAllocDao;import com.sankuai.inf.leaf.segment.model.*;import org.perf4j.StopWatch;import org.perf4j.slf4j.Slf4JStopWatch;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.*;import java.util.concurrent.*;import java.util.concurrent.atomic.AtomicLong;public class SegmentIDGenImpl implements IDGen { private static final Logger logger = LoggerFactory.getLogger(SegmentIDGenImpl.class); /** * IDCache未初始化成功時(shí)的異常碼 */ private static final long EXCEPTION_ID_IDCACHE_INIT_FALSE = -1; /** * key不存在時(shí)的異常碼 */ private static final long EXCEPTION_ID_KEY_NOT_EXISTS = -2; /** * SegmentBuffer中的兩個(gè)Segment均未從DB中裝載時(shí)的異常碼 */ private static final long EXCEPTION_ID_TWO_SEGMENTS_ARE_NULL = -3; /** * 最大步長(zhǎng)不超過100,0000 */ private static final int MAX_STEP = 1000000; /** * 一個(gè)Segment維持時(shí)間為15分鐘 * 這里指的是一個(gè)系統(tǒng)默認(rèn)認(rèn)為的合理時(shí)間,主要用于調(diào)整buffer里面步長(zhǎng)的大小,如果當(dāng)前次更新距離上次更新時(shí)間超過15分鐘的話 * 那么步長(zhǎng)就會(huì)動(dòng)態(tài)調(diào)整為二分之一之前的長(zhǎng)度,如果說當(dāng)次更新時(shí)間距離上次更新時(shí)間未超過15分鐘那么說明系統(tǒng)壓力大,那么就適當(dāng)調(diào)整步長(zhǎng)到2倍直到最大步長(zhǎng) * MAX_STEP */ private static final long SEGMENT_DURATION = 15 * 60 * 1000L; /** * 用來更新本地的buffer的線程池,用來更新每個(gè)tag對(duì)應(yīng)的segmentBufferr里面?zhèn)溆玫膕egment */ private ExecutorService service = new ThreadPoolExecutor(5, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new UpdateThreadFactory()); /** * 初始化狀態(tài),主要用來標(biāo)記mysql中tag是否初次被同步進(jìn)入內(nèi)存中 */ private volatile boolean initOK = false; /** * 用來保存每個(gè)tag對(duì)應(yīng)的segmentBuffer,業(yè)務(wù)通過tag進(jìn)行隔離,并且此處使用了并發(fā)安全的容器,主要是防止在刷新tag的時(shí)候出現(xiàn)線程不安全的問題 */ private Map<String, SegmentBuffer> cache = new ConcurrentHashMap<String, SegmentBuffer>(); /** * 主要是用來與mysql打交道,加載tag,加載step,更新maxid */ private IDAllocDao dao; /** * 作者比較優(yōu)秀,為了刷新線程起一個(gè)比較好聽的名字特意寫了個(gè)一個(gè)工廠,哈哈并且內(nèi)部做了一個(gè)線程計(jì)數(shù)的變量 */ public static class UpdateThreadFactory implements ThreadFactory { private static int threadInitNumber = 0; private static synchronized int nextThreadNum() { return threadInitNumber++; } @Override public Thread newThread(Runnable r) { return new Thread(r, "Thread-Segment-Update-" + nextThreadNum()); } } /** * 暴露給外部調(diào)用用來初始化分段id生成器的功能,主要包括更新所有的tag進(jìn)入到內(nèi)存中,并且啟動(dòng)一個(gè)單線程的守護(hù)線程去做定時(shí)刷新這些tag的操作, * 間隔60秒,這里之所以用單線程的線程池我個(gè)人的判斷是為了充分利用阻塞的特性,因?yàn)樵跇O端的情況下60秒加載不完那么就阻塞著在哪里,當(dāng)然,絕大多數(shù)業(yè)務(wù) * 一分鐘肯定是能夠加載完的。 */ @Override public boolean init() { logger.info("Init ..."); // 確保加載到kv后才初始化成功 updateCacheFromDb(); initOK = true; updateCacheFromDbAtEveryMinute(); return initOK; } /** * 刷新緩存的方法。單線程每隔60秒刷新一次tag,與mysql 同步一次tag的信息 */ private void updateCacheFromDbAtEveryMinute() { ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("check-idCache-thread"); t.setDaemon(true); return t; } }); service.scheduleWithFixedDelay(new Runnable() { @Override public void run() { updateCacheFromDb(); } }, 60, 60, TimeUnit.SECONDS); } /** * 從mysql中同步tag的信息進(jìn)入內(nèi)存中,這里作者做的也很巧妙,并不著急立馬就去加載segment我們看到SegmentBuffer里面有一個(gè)初始化是否成功的標(biāo)志字段 * initOk 他標(biāo)志著目前這個(gè)segmentBuffer是否可用,但是這個(gè)方法里面默認(rèn)是false的,作者在這里巧妙的利用了懶加載的方式,將max的值的更改延后,因?yàn)槲覀兯伎家环N弄場(chǎng)景 * leaf在美團(tuán)中可能是全集團(tuán)公用的,可能部署了上百個(gè)節(jié)點(diǎn),那么很有可能這些服務(wù)會(huì)面臨重啟,如果每次重啟都會(huì)默認(rèn)更新mysql的話,一方面會(huì)浪費(fèi)非常多的step的id,另外一方面很有可能 * 就算浪費(fèi)了id也可能會(huì)用不到,因此這里面用戶使用了懶加載的思想只是先進(jìn)行占位,當(dāng)用戶在真正使用的時(shí)候再去查詢并填充segment并更新mysql,因此這里面有個(gè)細(xì)節(jié)就是 * 如果系統(tǒng)極端在乎平滑性,那么在leaf在對(duì)外提供服務(wù)前,先手動(dòng)調(diào)用一次,以確保segment被填充完善,降低延時(shí)性。 */ private void updateCacheFromDb() { logger.info("update cache from db"); StopWatch sw = new Slf4JStopWatch(); try { List<String> dbTags = dao.getAllTags(); if (dbTags == null || dbTags.isEmpty()) { return; } List<String> cacheTags = new ArrayList<String>(cache.keySet()); Set<String> insertTagsSet = new HashSet<>(dbTags); Set<String> removeTagsSet = new HashSet<>(cacheTags); //db中新加的tags灌進(jìn)cache for(int i = 0; i < cacheTags.size(); i++){ String tmp = cacheTags.get(i); if(insertTagsSet.contains(tmp)){ insertTagsSet.remove(tmp); } } for (String tag : insertTagsSet) { SegmentBuffer buffer = new SegmentBuffer(); buffer.setKey(tag); Segment segment = buffer.getCurrent(); segment.setValue(new AtomicLong(0)); segment.setMax(0); segment.setStep(0); cache.put(tag, buffer); logger.info("Add tag {} from db to IdCache, SegmentBuffer {}", tag, buffer); } //cache中已失效的tags從cache刪除 for(int i = 0; i < dbTags.size(); i++){ String tmp = dbTags.get(i); if(removeTagsSet.contains(tmp)){ removeTagsSet.remove(tmp); } } for (String tag : removeTagsSet) { cache.remove(tag); logger.info("Remove tag {} from IdCache", tag); } } catch (Exception e) { logger.warn("update cache from db exception", e); } finally { sw.stop("updateCacheFromDb"); } } /** * 主力接口,用于對(duì)外界提供id * 判斷當(dāng)前tag緩存是否已經(jīng)就緒,如果未就緒直接報(bào)錯(cuò),因此要求調(diào)用方應(yīng)該先調(diào)用init(),進(jìn)行基礎(chǔ)環(huán)境的就緒 * 緩存就緒成功,從緩存中查看客戶端請(qǐng)求的key是否存在,不存在的可能有兩種,一種是mysql中沒有,這個(gè)需要等大概60秒才會(huì)刷新,因此在leaf使用過程中應(yīng)該提前就緒好mysql,讓讓多個(gè)leaf服務(wù)都能刷新到相應(yīng)的key * 另外一種可能就是mysql中也沒有,當(dāng)然也會(huì)造成cache中沒有,兩種情況造成的緩存中沒有,系統(tǒng)都會(huì)返回key不存在,id生成失敗 * 如果緩存中也恰好查到了有key,那么就會(huì)因?yàn)閼屑虞d的原因造成可能segmentBuffer沒有初始化,(任何事情都有兩面性) * 我們看到美團(tuán)的處理方式是通過鎖定對(duì)應(yīng)的segmentBuffer的對(duì)象頭,可以說也是無所不用其極的減低鎖粒度,不得不說一句nice * 另外我們看到使用了雙重檢查,防止并發(fā)問題,這里多啰嗦一句為什么回出現(xiàn)并發(fā)問題,兩個(gè)線程都到了synchronized的臨界區(qū)后,一個(gè)線程拿到了buffer的頭鎖,進(jìn)入可能去更新mysql了,如果他執(zhí)行完他會(huì)放開頭鎖 * 但是如果不通過判斷那么他也會(huì)繼續(xù)執(zhí)行更新mysql的操作,因此造成不滿足我們預(yù)期的事情發(fā)生了,所以這里通過一個(gè)initok的一個(gè)標(biāo)志進(jìn)行雙重判定,那么就算是第二個(gè)線程進(jìn)入后因?yàn)榈谝粋€(gè)線程退出前就更新了linitok為true * 所以第二個(gè)線程進(jìn)來后還是不能更新mysql就安全出去臨界區(qū)了。 * */ @Override public Result get(final String key) { if (!initOK) { return new Result(EXCEPTION_ID_IDCACHE_INIT_FALSE, Status.EXCEPTION); } if (cache.containsKey(key)) { SegmentBuffer buffer = cache.get(key); if (!buffer.isInitOk()) { synchronized (buffer) { if (!buffer.isInitOk()) { try { updateSegmentFromDb(key, buffer.getCurrent()); logger.info("Init buffer. Update leafkey {} {} from db", key, buffer.getCurrent()); buffer.setInitOk(true); } catch (Exception e) { logger.warn("Init buffer {} exception", buffer.getCurrent(), e); } } } } return getIdFromSegmentBuffer(cache.get(key)); } return new Result(EXCEPTION_ID_KEY_NOT_EXISTS, Status.EXCEPTION); } /** * 這個(gè)方法主要是用來更新并填充好,指定key對(duì)應(yīng)的SegmentBuffer * StopWatch 是一個(gè)計(jì)時(shí)器,作者考慮到這個(gè)方法的性能問題,因此加了一個(gè)監(jiān)控 * 先是判斷指定的segmentBuffer是否初始化完成,如果沒有初始化完成也就是說沒有向數(shù)據(jù)庫去申請(qǐng)id段,那么就去取申請(qǐng)并填充進(jìn)segmentBuffer * 如果是已經(jīng)初始化完成了,第二個(gè)分支其實(shí)特定指的是第二次申請(qǐng) */ public void updateSegmentFromDb(String key, Segment segment) { StopWatch sw = new Slf4JStopWatch(); SegmentBuffer buffer = segment.getBuffer(); LeafAlloc leafAlloc; //第一次申請(qǐng)id段 if (!buffer.isInitOk()) { leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key); buffer.setStep(leafAlloc.getStep()); buffer.setMinStep(leafAlloc.getStep());//leafAlloc中的step為DB中的step } else if (buffer.getUpdateTimestamp() == 0) { //第二次申請(qǐng)id段,因?yàn)橹暗牡谝淮紊暾?qǐng)動(dòng)作談不上更新,因此在第二次的時(shí)候?qū)⒏聲r(shí)間進(jìn)行填充 leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key); buffer.setUpdateTimestamp(System.currentTimeMillis()); buffer.setStep(leafAlloc.getStep()); buffer.setMinStep(leafAlloc.getStep());//leafAlloc中的step為DB中的step } else { //第N次申請(qǐng)id段動(dòng)態(tài)優(yōu)化步長(zhǎng),我們看到有一個(gè)指定的時(shí)間以15分鐘為例,如果兩次領(lǐng)取間隔少于15分鐘那么就將step拉大一倍,但是不會(huì)超過系統(tǒng)默認(rèn)的10W的step // 這樣做的好處其實(shí)也是降低mysql壓力 //如果兩次申請(qǐng)的超過30分鐘那么就將步長(zhǎng)調(diào)整為原來的一半,但是不會(huì)小于最小步長(zhǎng) long duration = System.currentTimeMillis() - buffer.getUpdateTimestamp(); int nextStep = buffer.getStep(); if (duration < SEGMENT_DURATION) { if (nextStep * 2 > MAX_STEP) { //do nothing } else { nextStep = nextStep * 2; } } else if (duration < SEGMENT_DURATION * 2) { //do nothing with nextStep } else { nextStep = nextStep / 2 >= buffer.getMinStep() ? nextStep / 2 : nextStep; } logger.info("leafKey[{}], step[{}], duration[{}mins], nextStep[{}]", key, buffer.getStep(), String.format("%.2f",((double)duration / (1000 * 60))), nextStep); LeafAlloc temp = new LeafAlloc(); temp.setKey(key); temp.setStep(nextStep); leafAlloc = dao.updateMaxIdByCustomStepAndGetLeafAlloc(temp); buffer.setUpdateTimestamp(System.currentTimeMillis()); buffer.setStep(nextStep); buffer.setMinStep(leafAlloc.getStep());//leafAlloc的step為DB中的step } // must set value before set max /** * 此處很坑,這是第一版本留下的無效注釋 * https://github.com/Meituan-Dianping/Leaf/issues/16 * 可以不用強(qiáng)制要求的,因?yàn)槭菃尉€程更新,并且buffer還沒有就緒因此不存在優(yōu)先可見的問題 */ long value = leafAlloc.getMaxId() - buffer.getStep(); segment.getValue().set(value); segment.setMax(leafAlloc.getMaxId()); segment.setStep(buffer.getStep()); sw.stop("updateSegmentFromDb", key + " " + segment); } /** * 核心處理方法 * 通過讀寫鎖提升并發(fā),讀鎖主要負(fù)責(zé)id的自增,但是如果只是自增那么靠automic操作就夠,因此還涉及到segment的切換,因此此處使用了讀寫鎖進(jìn)行分離 * 當(dāng)需要切換segment的時(shí)候讀鎖也會(huì)被掛起來,因?yàn)槿绻粧炱鸬脑挄?huì)出現(xiàn)臟讀。 * 方法的核心思想總結(jié)如下 * 通過while循環(huán),死循環(huán)的去取數(shù)據(jù),先是拿到讀鎖,此處總結(jié)一下 JUC包里面的讀寫鎖的特性,讀讀可并行,讀寫不可并行,寫寫不可并行。 * 在這里從概念上先完成梳理 * 1、備用buffer的更新是由單線程完成的,這里面是通過cas更新ThreadRunning實(shí)現(xiàn)的,因此備用buffer的更新是安全的 * 2、id的自增是通過AutomicLong實(shí)現(xiàn)的因此也不存在自增時(shí)候的線程安全問題 * 3、主備buffer的切換是由讀寫鎖來進(jìn)行控制的,讀鎖生效時(shí)候時(shí)候要么能夠自增成功則返回,要么自增不成功,線程開始搶寫鎖,如果搶上,那么新來的讀鎖請(qǐng)求就會(huì)被掛起, * 直到寫鎖完成buffer的切換,然后通過while循環(huán)自增后返回id */ public Result getIdFromSegmentBuffer(final SegmentBuffer buffer) { while (true) { buffer.rLock().lock(); try { final Segment segment = buffer.getCurrent(); if (!buffer.isNextReady() && (segment.getIdle() < 0.9 * segment.getStep()) && buffer.getThreadRunning().compareAndSet(false, true)) { service.execute(new Runnable() { @Override public void run() { Segment next = buffer.getSegments()[buffer.nextPos()]; boolean updateOk = false; try { updateSegmentFromDb(buffer.getKey(), next); updateOk = true; logger.info("update segment {} from db {}", buffer.getKey(), next); } catch (Exception e) { logger.warn(buffer.getKey() + " updateSegmentFromDb exception", e); } finally { if (updateOk) { buffer.wLock().lock(); buffer.setNextReady(true); buffer.getThreadRunning().set(false); buffer.wLock().unlock(); } else { buffer.getThreadRunning().set(false); } } } }); } long value = segment.getValue().getAndIncrement(); if (value < segment.getMax()) { return new Result(value, Status.SUCCESS); } } finally { buffer.rLock().unlock(); } waitAndSleep(buffer); buffer.wLock().lock(); try { //這里進(jìn)行這么判斷是因?yàn)榭赡苡卸鄠€(gè)寫鎖排隊(duì)在這里,一個(gè)寫鎖更新成了后,那么后面的線程直接取就好,不需要走后續(xù)的修改操作了。 final Segment segment = buffer.getCurrent(); long value = segment.getValue().getAndIncrement(); if (value < segment.getMax()) { return new Result(value, Status.SUCCESS); } //檢查備用buffer是否完成了準(zhǔn)備,如果準(zhǔn)備完成則進(jìn)行切換,如果未準(zhǔn)備完成則拋出異常代表主buffer還有從buffer都沒有準(zhǔn)備好,系統(tǒng)暫時(shí)不可用。 //產(chǎn)生的原因可能是刷新線程池阻塞,這可能性還是蠻小的,這也是為什么中間件作者在 update代碼段加入 stopwatch監(jiān)控的原因吧。 if (buffer.isNextReady()) { buffer.switchPos(); buffer.setNextReady(false); } else { logger.error("Both two segments in {} are not ready!", buffer); return new Result(EXCEPTION_ID_TWO_SEGMENTS_ARE_NULL, Status.EXCEPTION); } } finally { buffer.wLock().unlock(); } } } /** * 讓當(dāng)前線程進(jìn)入死循環(huán)等待,為了降低無效的cpu輪訓(xùn)如果循環(huán)次數(shù)超過一萬后就休眠10ms */ private void waitAndSleep(SegmentBuffer buffer) { int roll = 0; while (buffer.getThreadRunning().get()) { roll += 1; if(roll > 10000) { try { TimeUnit.MILLISECONDS.sleep(10); break; } catch (InterruptedException e) { logger.warn("Thread {} Interrupted",Thread.currentThread().getName()); break; } } } } public List<LeafAlloc> getAllLeafAllocs() { return dao.getAllLeafAllocs(); } public Map<String, SegmentBuffer> getCache() { return cache; } public IDAllocDao getDao() { return dao; } public void setDao(IDAllocDao dao) { this.dao = dao; }}
以上就是關(guān)于pos機(jī)系統(tǒng)源碼,美團(tuán)leaf源碼解析的知識(shí),后面我們會(huì)繼續(xù)為大家整理關(guān)于pos機(jī)系統(tǒng)源碼的知識(shí),希望能夠幫助到大家!









